diff --git a/GEMstack/onboard/planning/longitudinal_planning.py b/GEMstack/onboard/planning/longitudinal_planning.py index cb1e3cf03..61ad9240d 100644 --- a/GEMstack/onboard/planning/longitudinal_planning.py +++ b/GEMstack/onboard/planning/longitudinal_planning.py @@ -1,4 +1,4 @@ -from typing import List, Tuple +from typing import List, Tuple, Union from ..component import Component from ...state import AllState, VehicleState, EntityRelation, EntityRelationEnum, Path, Trajectory, Route, ObjectFrameEnum, AgentState from ...utils import serialization @@ -9,7 +9,7 @@ import matplotlib.pyplot as plt import matplotlib.patches as patches import math - +from scipy.optimize import minimize ################################################################################ @@ -360,6 +360,107 @@ def detect_collision(curr_x: float, curr_y: float, curr_v: float, obj: AgentStat +def detect_collision_analytical(r_pedestrain_x: float, r_pedestrain_y: float, p_vehicle_left_y_after_t: float, p_vehicle_right_y_after_t: float, lateral_buffer: float) -> Union[bool, str]: + """Detects if a collision will occur with the given object and return deceleration to avoid it. Analytical""" + if r_pedestrain_x < 0 and abs(r_pedestrain_y) > lateral_buffer: + return False + elif r_pedestrain_x < 0: + return 'max' + if r_pedestrain_y >= p_vehicle_left_y_after_t and r_pedestrain_y <= p_vehicle_right_y_after_t: + return True + + return False + + +def get_minimum_deceleration_for_collision_avoidance(curr_x: float, curr_y: float, curr_v: float, obj: AgentState, min_deceleration: float, max_deceleration: float) -> Tuple[bool, float]: + """Detects if a collision will occur with the given object and return deceleration to avoid it. Via Optimization""" + + # Get the object's position and velocity + obj_x = obj.pose.x + obj_y = obj.pose.y + obj_v_x = obj.velocity[0] + obj_v_y = obj.velocity[1] + + + vehicle_length = 3 + vehicle_width = 2 + + vehicle_buffer_x = 3.0 + vehicle_buffer_y = 1.0 + + obj_x = obj_x - curr_x + obj_y = obj_y - curr_y + + curr_x = curr_x - curr_x + curr_y = curr_y - curr_y + + vehicle_front = curr_x + vehicle_length + vehicle_buffer_x + vehicle_back = curr_x + vehicle_left = curr_y - vehicle_width / 2 + vehicle_right = curr_y + vehicle_width / 2 + + r_vehicle_front = vehicle_front - vehicle_front + r_vehicle_back = vehicle_back - vehicle_front + r_vehicle_left = vehicle_left - vehicle_buffer_y + r_vehicle_right = vehicle_right + vehicle_buffer_y + r_vehicle_v_x = curr_v + r_vehicle_v_y = 0 + + r_pedestrain_x = obj_x - vehicle_front + r_pedestrain_y = -obj_y + r_pedestrain_v_x = obj_v_x + r_pedestrain_v_y = -obj_v_y + + r_velocity_x_from_vehicle = r_vehicle_v_x - r_pedestrain_v_x + r_velocity_y_from_vehicle = r_vehicle_v_y - r_pedestrain_v_y + + t_to_r_pedestrain_x = (r_pedestrain_x - r_vehicle_front) / r_velocity_x_from_vehicle + + p_vehicle_left_y_after_t = r_vehicle_left + r_velocity_y_from_vehicle * t_to_r_pedestrain_x + p_vehicle_right_y_after_t = r_vehicle_right + r_velocity_y_from_vehicle * t_to_r_pedestrain_x + + collision_flag = detect_collision_analytical(r_pedestrain_x, r_pedestrain_y, p_vehicle_left_y_after_t, p_vehicle_right_y_after_t, vehicle_buffer_y) + if collision_flag == False: + print("No collision", curr_x, curr_y, r_pedestrain_x, r_pedestrain_y, r_vehicle_left, r_vehicle_right, p_vehicle_left_y_after_t, p_vehicle_right_y_after_t) + return 0.0, r_pedestrain_x + elif collision_flag == 'max': + return max_deceleration, r_pedestrain_x + + print("Collision", curr_x, curr_y, r_pedestrain_x, r_pedestrain_y, r_vehicle_left, r_vehicle_right, p_vehicle_left_y_after_t, p_vehicle_right_y_after_t) + yaw = None + minimum_deceleration = None + if yaw is None: + if abs(r_velocity_y_from_vehicle) > 0.1: + if r_velocity_y_from_vehicle > 0.1: + # Vehicle Left would be used to yield + r_pedestrain_y_temp = r_pedestrain_y + abs(r_vehicle_left) + elif r_velocity_y_from_vehicle < -0.1: + # Vehicle Right would be used to yield + r_pedestrain_y_temp = r_pedestrain_y - abs(r_vehicle_right) + + softest_accleration = 2 * r_velocity_y_from_vehicle * (r_velocity_y_from_vehicle * r_pedestrain_x - r_velocity_x_from_vehicle * r_pedestrain_y_temp) / r_pedestrain_y_temp**2 + peak_y = -(r_velocity_x_from_vehicle * r_velocity_y_from_vehicle) / softest_accleration + # if the peak is within the position of the pedestrian, + # then it indicates the path had already collided with the pestrain, + # and so the softest acceleration should be the one the peak of the path is the same as the pedestrain's x position + # and the vehicle should be stopped exactly before the pedestrain's x position + if abs(peak_y) > abs(r_pedestrain_y_temp): + minimum_deceleration = abs(softest_accleration) + # else: the vehicle should be stopped exactly before the pedestrain's x position the same case as the pedestrain barely move laterally + if minimum_deceleration is None: + minimum_deceleration = r_velocity_x_from_vehicle**2 / (2 * r_pedestrain_x) + else: + pass + + print("calculatedminimum_deceleration: ", minimum_deceleration) + + if minimum_deceleration < min_deceleration: + return 0.0, r_pedestrain_x + else: + return max(min(minimum_deceleration, max_deceleration), min_deceleration), r_pedestrain_x + + + ################################################################################ ########## Longitudinal Planning ############################################### ################################################################################ @@ -1079,6 +1180,9 @@ def longitudinal_brake(path : Path, deceleration : float, current_speed : float) ################################################################################ +########################## +##### Patrick's Code ##### +########################## class YieldTrajectoryPlanner(Component): """Follows the given route. Brakes if you have to yield or @@ -1086,18 +1190,18 @@ class YieldTrajectoryPlanner(Component): the desired speed. """ - def __init__(self, mode : str = 'real', planner : str = 'milestone'): + def __init__(self, mode : str = 'real', params : dict = {"planner": "dt", "desired_speed": 1.0, "acceleration": 0.5}): self.route_progress = None self.t_last = None - self.acceleration = 0.5 - self.desired_speed = 1.0 + self.acceleration = params["acceleration"] + self.desired_speed = params["desired_speed"] self.deceleration = 2.0 self.min_deceleration = 1.0 self.max_deceleration = 8.0 self.mode = mode - self.planner = planner + self.planner = params["planner"] def state_inputs(self): return ['all'] @@ -1128,15 +1232,14 @@ def update(self, state : AllState): abs_y = curr_y + state.start_vehicle_pose.y ############################################### - print("@@@@@ VEHICLE STATE @@@@@") - print(vehicle) - print("@@@@@@@@@@@@@@@@@@@@@@@@@") + # print("@@@@@ VEHICLE STATE @@@@@") + # print(vehicle) + # print("@@@@@@@@@@@@@@@@@@@@@@@@@") if self.mode == 'real': # Position in vehicle frame (Start (0,0) to (15,0)) - curr_x = vehicle.pose.x * 20 - curr_y = vehicle.pose.y * 20 - curr_v = vehicle.v + # curr_x = vehicle.pose.x * 20 + # curr_y = vehicle.pose.y * 20 # print("@@@@@ PLAN", curr_x, curr_y, curr_v) abs_x = curr_x abs_y = curr_y @@ -1165,10 +1268,6 @@ def update(self, state : AllState): #get the object we are yielding to obj = state.agents[r.obj2] - print("@@@@@ AGENT STATE @@@@@") - print(obj) - print("@@@@@@@@@@@@@@@@@@@@@@@") - if self.mode == 'real': obj.pose.x = obj.pose.x + curr_x obj.pose.y = obj.pose.y + curr_y @@ -1197,26 +1296,30 @@ def update(self, state : AllState): #choose whether to accelerate, brake, or keep at current velocity if should_accelerate: - traj = longitudinal_plan(route_with_lookahead, self.acceleration, self.deceleration, self.desired_speed, curr_v, "milestone") + traj = longitudinal_plan(route_with_lookahead, self.acceleration, self.deceleration, self.desired_speed, curr_v, self.planner) elif should_yield: traj = longitudinal_brake(route_with_lookahead, yield_deceleration, curr_v) else: - traj = longitudinal_plan(route_with_lookahead, 0.0, self.deceleration, self.desired_speed, curr_v, "dt") + traj = longitudinal_plan(route_with_lookahead, 0.0, self.deceleration, self.desired_speed, curr_v, self.planner) return traj +# ######################## +# ##### Yudai's Code ##### +# ######################## + # class YieldTrajectoryPlanner(Component): # """Follows the given route. Brakes if you have to yield or # you are at the end of the route, otherwise accelerates to # the desired speed. # """ -# def __init__(self, mode : str = 'real', planner : str = 'milestone'): +# def __init__(self, mode : str = 'real', params : dict = {}): # self.route_progress = None # self.t_last = None -# self.acceleration = 0.75 # 0.5 is not enough to start -# self.desired_speed = 1.0 # cannot got more than 1.5 m/s +# self.acceleration = params["acceleration"] +# self.desired_speed = params["desired_speed"] # # Yielding parameters # # Yielding speed [..., 1.0, 0.8, ..., 0.2] @@ -1227,7 +1330,7 @@ def update(self, state : AllState): # # Planner mode # self.mode = mode -# self.planner = planner +# self.planner = params["planner"] # def state_inputs(self): # return ['all'] @@ -1260,15 +1363,14 @@ def update(self, state : AllState): # ############################################### # # # TODO: Fix the coordinate conversion of other files -# print("@@@@@ VEHICLE STATE @@@@@") -# print(vehicle) -# print("@@@@@@@@@@@@@@@@@@@@@@@@@") +# # print("@@@@@ VEHICLE STATE @@@@@") +# # print(vehicle) +# # print("@@@@@@@@@@@@@@@@@@@@@@@@@") # if self.mode == 'real': # # Position in vehicle frame (Start (0,0) to (15,0)) -# curr_x = vehicle.pose.x * 20 -# curr_y = vehicle.pose.y * 20 -# curr_v = vehicle.v +# # curr_x = vehicle.pose.x * 20 +# # curr_y = vehicle.pose.y * 20 # # print("@@@@@ PLAN", curr_x, curr_y, curr_v) # abs_x = curr_x # abs_y = curr_y @@ -1291,9 +1393,9 @@ def update(self, state : AllState): # # Default values # should_brake = False -# desired_speed = self.desired_speed -# accel = self.acceleration -# decel = self.deceleration +# input_values = [{"decel": self.deceleration, "desired_speed": self.desired_speed, "collision_distance": lookahead_distance}] + +# print(state.relations) # for r in state.relations: # if r.type == EntityRelationEnum.YIELDING and r.obj1 == '': @@ -1328,9 +1430,9 @@ def update(self, state : AllState): # GLOBAL = 2 #position in longitude / latitude, yaw=heading in radians with respect to true north (used in GNSS) # ABSOLUTE_CARTESIAN = 3 #position / yaw in m / radians relative to a global cartesian reference frame (used in simulation) # """ -# print("@@@@@ AGENT STATE @@@@@") -# print(a) -# print("@@@@@@@@@@@@@@@@@@@@@@@") +# # print("@@@@@ AGENT STATE @@@@@") +# # print(a) +# # print("@@@@@@@@@@@@@@@@@@@@@@@") # # Pedestrian parameters. # x2, y2 = a.pose.x, a.pose.y @@ -1352,12 +1454,14 @@ def update(self, state : AllState): # print(f"Pedestrian: ({x2:.1f}, {y2:.1f}, ({v2[0]:.1f}, {v2[1]:.1f}))") # # Simulate if a collision will occur when the vehicle accelerate to desired speed. -# sim = CollisionDetector(x1, y1, 0, x2, y2, 0, v1, v2, total_time, desired_speed, accel) +# sim = CollisionDetector(x1, y1, 0, x2, y2, 0, v1, v2, total_time, self.desired_speed, self.acceleration) # collision_distance = sim.run() # # No collision detected with acceleration to desired speed. # if collision_distance < 0: # print("No collision detected.") +# input_values.append({"decel": self.deceleration, "desired_speed": self.desired_speed, "collision_distance": collision_distance}) +# continue # # Collision detected with acceleration to desired speed. # # => Check if the vehicle can yield to the pedestrian with deceleration. @@ -1366,20 +1470,25 @@ def update(self, state : AllState): # ############################################### # # # UNCOMMENT NOT TO YIELD: JUST STOP FOR PART1 # # print("The vehicle is Stopping.") +# # print("@@@@@", a.pose.x) + +# # # Update the collision distance. +# # if lookahead_distance_to_pedestrian > collision_distance: +# # lookahead_distance_to_pedestrian = collision_distance + # # # Decide the deceleration based on the collision distance. -# # brake_deceleration = max(self.deceleration, v1[0]**2 / (2 * (collision_distance))) -# # if brake_deceleration > self.deceleration: -# # if brake_deceleration > self.max_deceleration: -# # brake_deceleration = self.max_deceleration +# # # To stop perfectly, assume the vehicle is running at the desired speed. +# # brake_deceleration = max(self.deceleration, desired_speed**2 / (2 * (collision_distance))) +# # if brake_deceleration > self.max_deceleration: +# # brake_deceleration = self.max_deceleration + +# # if brake_deceleration > decel: # # decel = brake_deceleration if brake_deceleration > decel else decel # # should_brake = True -# # break # # break # ############################################### # print("Collision detected. Try to find yielding speed.") -# # Update lookahead distance to pedestrian. -# route_with_lookahead = route.trim(closest_parameter, closest_parameter + collision_distance) # collision_distance_after_yield = -1 @@ -1388,30 +1497,30 @@ def update(self, state : AllState): # for v in self.yield_speed: # # Simulate if the vehicle can yield to the pedestrian with acceleration to yielding speed. # if v > v1[0]: -# sim.set_params(v, accel) +# sim.set_params(v, self.acceleration) # # Simulate if the vehicle can yield to the pedestrian with deceleration to yielding speed. # else: # sim.set_params(v, self.yield_deceleration * -1.0) # collision_distance_after_yield = sim.run() # if collision_distance_after_yield < 0: # print(f"Yielding at speed: {v}") -# desired_speed = v if v < desired_speed else desired_speed -# decel = self.yield_deceleration if self.yield_deceleration > decel else decel +# input_values.append({"decel": self.yield_deceleration, "desired_speed": v, "collision_distance": collision_distance}) # break # # Collision detected with any yielding speed. # # => Brake to avoid collision. -# if collision_distance_after_yield > 0: +# if collision_distance_after_yield >= 0: # print("The vehicle is Stopping.") # # Decide the deceleration based on the collision distance. # brake_deceleration = max(self.deceleration, v1[0]**2 / (2 * (collision_distance))) # if brake_deceleration > self.max_deceleration: # brake_deceleration = self.max_deceleration -# decel = brake_deceleration if brake_deceleration > decel else decel +# input_values.append({"decel": brake_deceleration, "desired_speed": v1[0], "collision_distance": collision_distance}) # should_brake = True -# break - +# # You need to break state.relation loop to avoid unnecessary computation. +# break + # # # UNCOMMENT TO BRAKE FOR ALL PEDESTRIANS # # should_brake = True # # desired_speed = 0.0 @@ -1424,6 +1533,15 @@ def update(self, state : AllState): # #========================= +# # Choose the maximum deceleration from input_values. +# print("Input values:", input_values) +# decel = min(input_values, key=lambda x: x['desired_speed'])['decel'] +# desired_speed = min(input_values, key=lambda x: x['desired_speed'])['desired_speed'] +# collision_distance = min(input_values, key=lambda x: x['desired_speed'])['collision_distance'] + +# # Update the lookahead distance to pedestrian. +# route_with_lookahead = route.trim(closest_parameter,closest_parameter + collision_distance) + # print(f"Desired speed: {desired_speed:.2f} m/s") # print(f"Deceleration: {decel:.2f} m/s^2") @@ -1432,7 +1550,7 @@ def update(self, state : AllState): # # traj = longitudinal_plan(route_to_end, self.acceleration, self.deceleration, self.desired_speed, curr_v, "milestone") # #choose whether to accelerate, brake, or keep at current velocity # if should_accelerate: -# traj = longitudinal_plan(route_with_lookahead, accel, decel, desired_speed, curr_v, self.planner) +# traj = longitudinal_plan(route_with_lookahead, self.acceleration, decel, desired_speed, curr_v, self.planner) # elif should_brake: # traj = longitudinal_brake(route_with_lookahead, decel, curr_v) # else: @@ -1441,3 +1559,122 @@ def update(self, state : AllState): # print(f"Simulation took {time.time() - start_time:.3f} seconds.") # return traj + + +# ######################## +# ##### Henry's Code ##### +# ######################## +# class YieldTrajectoryPlanner(Component): +# """Follows the given route. Brakes if you have to yield or +# you are at the end of the route, otherwise accelerates to +# the desired speed. +# """ + +# def __init__(self, mode : str = 'real', params : dict = {}): +# self.route_progress = None +# self.t_last = None +# self.acceleration = params["acceleration"] +# self.desired_speed = params["desired_speed"] +# self.deceleration = 2.0 + +# self.min_deceleration = 1.0 +# self.max_deceleration = 8.0 + +# self.mode = mode +# self.planner = params["planner"] + +# def state_inputs(self): +# return ['all'] + +# def state_outputs(self) -> List[str]: +# return ['trajectory'] + +# def rate(self): +# return 10.0 + +# def update(self, state : AllState): +# start_time = time.time() + +# vehicle = state.vehicle # type: VehicleState +# route = state.route # type: Route +# t = state.t + +# if self.t_last is None: +# self.t_last = t +# dt = t - self.t_last + +# # Position in vehicle frame (Start (0,0) to (15,0)) +# curr_x = vehicle.pose.x +# curr_y = vehicle.pose.y +# curr_v = vehicle.v + +# abs_x = curr_x + state.start_vehicle_pose.x +# abs_y = curr_y + state.start_vehicle_pose.y + + # ############################################### + # # print("@@@@@ VEHICLE STATE @@@@@") + # # print(vehicle) + # # print("@@@@@@@@@@@@@@@@@@@@@@@@@") + + # if self.mode == 'real': + # # Position in vehicle frame (Start (0,0) to (15,0)) + # # curr_x = vehicle.pose.x * 20 + # # curr_y = vehicle.pose.y * 20 + # # print("@@@@@ PLAN", curr_x, curr_y, curr_v) + # abs_x = curr_x + # abs_y = curr_y + # # print("@@@@@ PLAN", abs_x, abs_y) + # ############################################### + +# #figure out where we are on the route +# if self.route_progress is None: +# self.route_progress = 0.0 +# closest_dist,closest_parameter = state.route.closest_point_local((curr_x,curr_y),[self.route_progress-5.0,self.route_progress+5.0]) +# self.route_progress = closest_parameter + +# lookahead_distance = max(10, curr_v**2 / (2 * self.deceleration)) +# route_with_lookahead = route.trim(closest_parameter,closest_parameter + lookahead_distance) +# print("Lookahead distance:", lookahead_distance) + +# route_to_end = route.trim(closest_parameter, len(route.points) - 1) + +# should_yield = False +# yield_deceleration = 0.0 + +# print("Current Speed: ", curr_v) + +# for r in state.relations: +# if r.type == EntityRelationEnum.YIELDING and r.obj1 == '': +# #get the object we are yielding to +# obj = state.agents[r.obj2] + +# if self.mode == 'real': +# obj.pose.x = obj.pose.x + curr_x +# obj.pose.y = obj.pose.y + curr_y + +# deceleration, r_pedestrain_x = get_minimum_deceleration_for_collision_avoidance(abs_x, abs_y, curr_v, obj, self.min_deceleration, self.max_deceleration) +# print("deceleration: ", deceleration) +# if deceleration > 0: +# yield_deceleration = max(deceleration, yield_deceleration) +# should_yield = True + +# print("should yield: ", should_yield) + +# should_accelerate = (not should_yield and curr_v < self.desired_speed) + +# #choose whether to accelerate, brake, or keep at current velocity +# if should_accelerate: +# traj = longitudinal_plan(route_with_lookahead, self.acceleration, self.deceleration, self.desired_speed, curr_v, "milestone") +# elif should_yield: +# desired_speed = math.sqrt(-2 * yield_deceleration * r_pedestrain_x + curr_v**2) +# desired_speed = max(desired_speed, 0) +# # traj = longitudinal_brake(route_with_lookahead, yield_deceleration, curr_v) +# if desired_speed > 0: +# traj = longitudinal_plan(route_with_lookahead, 0, yield_deceleration, desired_speed, curr_v, "dt") +# else: +# traj = longitudinal_brake(route_with_lookahead, yield_deceleration, curr_v) +# else: +# traj = longitudinal_plan(route_with_lookahead, 0.0, self.deceleration, self.desired_speed, curr_v, "dt") + +# return traj + \ No newline at end of file diff --git a/GEMstack/onboard/planning/testing.ipynb b/GEMstack/onboard/planning/testing.ipynb new file mode 100644 index 000000000..1c81ff742 --- /dev/null +++ b/GEMstack/onboard/planning/testing.ipynb @@ -0,0 +1,603 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": 18, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "v1 84.8528137423857\n", + "a1x [0.1]\n", + "v1 84.8528137423857\n", + "P1 7.0710678118654755\n", + "t_c 0.07276090201378733\n", + "P1_new [4.36538941 4.36565412]\n", + "P2_new [5.0727609 5.0727609]\n", + "distance 1.0001871939422702\n", + "a1x [0.1]\n", + "v1 84.8528137423857\n", + "P1 7.0710678118654755\n", + "t_c 0.07276090201378733\n", + "P1_new [4.36538941 4.36565412]\n", + "P2_new [5.0727609 5.0727609]\n", + "distance 1.0001871939422702\n", + "a1x [0.1]\n", + "v1 84.8528137423857\n", + "P1 7.0710678118654755\n", + "t_c 0.07276090201378733\n", + "P1_new [4.36538941 4.36565412]\n", + "P2_new [5.0727609 5.0727609]\n", + "distance 1.0001871939422702\n", + "a1x [0.10000001]\n", + "v1 84.8528137423857\n", + "P1 7.0710678118654755\n", + "t_c 0.07276090201378733\n", + "P1_new [4.36538941 4.36565412]\n", + "P2_new [5.0727609 5.0727609]\n", + "distance 1.0001871939701665\n", + "a1x [0.001]\n", + "v1 84.8528137423857\n", + "P1 7.0710678118654755\n", + "t_c 0.07276090201378733\n", + "P1_new [4.36565147 4.36565412]\n", + "P2_new [5.0727609 5.0727609]\n", + "distance 1.0000018717660313\n", + "a1x [0.001]\n", + "v1 84.8528137423857\n", + "P1 7.0710678118654755\n", + "t_c 0.07276090201378733\n", + "P1_new [4.36565147 4.36565412]\n", + "P2_new [5.0727609 5.0727609]\n", + "distance 1.0000018717660313\n", + "a1x [0.00100001]\n", + "v1 84.8528137423857\n", + "P1 7.0710678118654755\n", + "t_c 0.07276090201378733\n", + "P1_new [4.36565147 4.36565412]\n", + "P2_new [5.0727609 5.0727609]\n", + "distance 1.0000018717939232\n", + "Optimal Deceleration (Braking): 0.0010000000000002507\n" + ] + } + ], + "source": [ + "import numpy as np\n", + "from scipy.optimize import minimize\n", + "\n", + "def distance(P1, P2):\n", + " \"\"\" Compute Euclidean distance between two points \"\"\"\n", + " return np.linalg.norm(P1 - P2)\n", + "\n", + "def objective(a1):\n", + " \"\"\" Objective function: minimize deceleration a1 \"\"\"\n", + " return a1 # Minimize braking force\n", + "\n", + "def detect_collision(P1, P2, V1, V2, r1, r2):\n", + " \"\"\" Detects if a collision will occur with the given object and return deceleration to avoid it.\"\"\"\n", + " delta_p = P2 - P1\n", + " delta_v = V1 - V2\n", + "\n", + " # Normalize the vectors\n", + " norm_p = np.linalg.norm(delta_p)\n", + " norm_v = np.linalg.norm(delta_v)\n", + "\n", + " tol = 1e-6\n", + " # Avoid division by zero\n", + " if norm_p < tol or norm_v < tol:\n", + " return False\n", + "\n", + " unit_p = delta_p / norm_p # Normalized ΔP\n", + " unit_v = delta_v / norm_v # Normalized ΔV\n", + "\n", + " # Check if unit vectors are nearly identical\n", + " return np.allclose(unit_p, unit_v, atol=tol)\n", + "\n", + "def constraint_finite_difference(a1x, v1, v2, P1, P2, V1, V2, r1, r2):\n", + " \"\"\" Ensure Object 1 does not collide with Object 2 while braking \"\"\"\n", + "\n", + " print('a1x', a1x)\n", + " if a1x <= 0:\n", + " return -1 # Ensure positive deceleration\n", + " print('v1', v1)\n", + " print('P1', distance(P1, P2))\n", + " t_c = (distance(P1, P2) - (r1 + r2)) / (v1 - v2)\n", + " print('t_c', t_c)\n", + " P1_new = P1 + V1 * t_c - 0.5 * np.array([a1x[0], 0]) * t_c**2\n", + " P2_new = P2 + V2 * t_c # Object 2 continues normally\n", + " print('P1_new', P1_new)\n", + " print('P2_new', P2_new)\n", + " print('distance', distance(P1_new, P2_new))\n", + " # Ensure new positions do not result in a collision\n", + " return distance(P1_new, P2_new) - (r1 + r2)\n", + "\n", + "\n", + "# Example input\n", + "P1 = np.array([0, 0])\n", + "V1 = np.array([60, 60]) # Initial velocity of Object 1\n", + "P2 = np.array([5, 5])\n", + "V2 = np.array([1, 1]) # Velocity of Object 2\n", + "v1 = np.linalg.norm(V1) # Compute initial speed\n", + "v2 = np.linalg.norm(V2)\n", + "print('v1', v1)\n", + "r1 = 0.5 # Radius of Object 1\n", + "r2 = 0.5 # Radius of Object 2\n", + "\n", + "# Initial guess: a1\n", + "a1x0 = 0.1 # Start with a small deceleration\n", + "\n", + "# Bounds: (a1 in (0, 1] to ensure positive braking)\n", + "bounds = [(1e-3, 1)] # Prevent zero deceleration\n", + "\n", + "# Solve optimization problem\n", + "res = minimize(\n", + " objective, \n", + " a1x0, \n", + " bounds=bounds,\n", + " constraints={'type': 'ineq', 'fun': constraint_finite_difference, 'args': (v1, v2, P1, P2, V1, V2, r1, r2)}\n", + ")\n", + "\n", + "# Optimal deceleration\n", + "optimal_a1 = res.x[0]\n", + "print(\"Optimal Deceleration (Braking):\", optimal_a1)\n", + "\n", + "# # Compute braking duration based on optimal deceleration\n", + "# optimal_t_d = (s1 - (distance(P1, P2) / (distance(P1, P2) / (distance(P1, P2) / s1 + d_safe / s1)))) / optimal_a1\n", + "# print(\"Braking Duration (t_d):\", optimal_t_d)\n", + "\n", + "# # Compute new velocity after braking (only reducing speed, not direction)\n", + "# scaling_factor = 1 - (optimal_a1 * optimal_t_d / s1)\n", + "# V1_new = scaling_factor * V1\n", + "# print(\"New Velocity After Braking:\", V1_new)\n" + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "7.0710678118654755\n" + ] + } + ], + "source": [ + "import math\n", + "print(math.sqrt(50))" + ] + }, + { + "cell_type": "code", + "execution_count": 11, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "2.146446609406726\n" + ] + } + ], + "source": [ + "\n", + "print((math.sqrt(50) - 1) / 2.8284271247461903)" + ] + }, + { + "cell_type": "code", + "execution_count": 24, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Collision detected. Computing optimal braking force...\n", + "a1x [0.1]\n", + "a1x [0.1]\n", + "a1x [0.1]\n", + "a1x [0.10000001]\n", + "a1x [0.14386871]\n", + "a1x [0.10438687]\n", + "a1x [0.10438687]\n", + "a1x [0.10438689]\n", + "a1x [0.14369264]\n", + "a1x [0.10831745]\n", + "a1x [0.10831745]\n", + "a1x [0.10831746]\n", + "a1x [0.14355521]\n", + "a1x [0.11184122]\n", + "a1x [0.11184122]\n", + "a1x [0.11184124]\n", + "a1x [0.1434475]\n", + "a1x [0.11500185]\n", + "a1x [0.11500185]\n", + "a1x [0.11500187]\n", + "a1x [0.14336277]\n", + "a1x [0.11783794]\n", + "a1x [0.11783794]\n", + "a1x [0.11783796]\n", + "a1x [0.14329589]\n", + "a1x [0.12038374]\n", + "a1x [0.12038374]\n", + "a1x [0.12038375]\n", + "a1x [0.14324297]\n", + "a1x [0.12266966]\n", + "a1x [0.12266966]\n", + "a1x [0.12266968]\n", + "a1x [0.14320024]\n", + "a1x [0.12472272]\n", + "a1x [0.12472272]\n", + "a1x [0.12472273]\n", + "a1x [0.14318682]\n", + "a1x [0.12656913]\n", + "a1x [0.12656913]\n", + "a1x [0.12656914]\n", + "a1x [0.1431409]\n", + "a1x [0.12822631]\n", + "a1x [0.12822631]\n", + "a1x [0.12822632]\n", + "a1x [0.14311966]\n", + "a1x [0.12971564]\n", + "a1x [0.12971564]\n", + "a1x [0.12971566]\n", + "a1x [0.14310267]\n", + "a1x [0.13105435]\n", + "a1x [0.13105435]\n", + "a1x [0.13105436]\n", + "a1x [0.14308908]\n", + "a1x [0.13225782]\n", + "a1x [0.13225782]\n", + "a1x [0.13225783]\n", + "a1x [0.14307818]\n", + "a1x [0.13333985]\n", + "a1x [0.13333985]\n", + "a1x [0.13333987]\n", + "a1x [0.14306944]\n", + "a1x [0.13431281]\n", + "a1x [0.13431281]\n", + "a1x [0.13431283]\n", + "a1x [0.14306244]\n", + "a1x [0.13518778]\n", + "a1x [0.13518778]\n", + "a1x [0.13518779]\n", + "a1x [0.14305574]\n", + "a1x [0.13597457]\n", + "a1x [0.13597457]\n", + "a1x [0.13597459]\n", + "a1x [0.14305222]\n", + "a1x [0.13668234]\n", + "a1x [0.13668234]\n", + "a1x [0.13668235]\n", + "a1x [0.14304857]\n", + "a1x [0.13731896]\n", + "a1x [0.13731896]\n", + "a1x [0.13731898]\n", + "a1x [0.14304562]\n", + "a1x [0.13789163]\n", + "a1x [0.13789163]\n", + "a1x [0.13789164]\n", + "a1x [0.14304325]\n", + "a1x [0.13840679]\n", + "a1x [0.13840679]\n", + "a1x [0.1384068]\n", + "a1x [0.14304133]\n", + "a1x [0.13887024]\n", + "a1x [0.13887024]\n", + "a1x [0.13887026]\n", + "a1x [0.14303979]\n", + "a1x [0.1392872]\n", + "a1x [0.1392872]\n", + "a1x [0.13928721]\n", + "a1x [0.14303861]\n", + "a1x [0.13966234]\n", + "a1x [0.13966234]\n", + "a1x [0.13966235]\n", + "a1x [0.14303753]\n", + "a1x [0.13999986]\n", + "a1x [0.13999986]\n", + "a1x [0.13999987]\n", + "a1x [0.14303672]\n", + "a1x [0.14030354]\n", + "a1x [0.14030354]\n", + "a1x [0.14030356]\n", + "a1x [0.14303606]\n", + "a1x [0.1405768]\n", + "a1x [0.1405768]\n", + "a1x [0.14057681]\n", + "a1x [0.14303553]\n", + "a1x [0.14082267]\n", + "a1x [0.14082267]\n", + "a1x [0.14082268]\n", + "a1x [0.1430351]\n", + "a1x [0.14104391]\n", + "a1x [0.14104391]\n", + "a1x [0.14104393]\n", + "a1x [0.14303475]\n", + "a1x [0.141243]\n", + "a1x [0.141243]\n", + "a1x [0.14124301]\n", + "a1x [0.14303454]\n", + "a1x [0.14142215]\n", + "a1x [0.14142215]\n", + "a1x [0.14142217]\n", + "a1x [0.14303425]\n", + "a1x [0.14158336]\n", + "a1x [0.14158336]\n", + "a1x [0.14158338]\n", + "a1x [0.14303406]\n", + "a1x [0.14172843]\n", + "a1x [0.14172843]\n", + "a1x [0.14172845]\n", + "a1x [0.14303391]\n", + "a1x [0.14185898]\n", + "a1x [0.14185898]\n", + "a1x [0.14185899]\n", + "a1x [0.14303379]\n", + "a1x [0.14197646]\n", + "a1x [0.14197646]\n", + "a1x [0.14197648]\n", + "a1x [0.1430337]\n", + "a1x [0.14208218]\n", + "a1x [0.14208218]\n", + "a1x [0.1420822]\n", + "a1x [0.14303362]\n", + "a1x [0.14217733]\n", + "a1x [0.14217733]\n", + "a1x [0.14217734]\n", + "a1x [0.14303347]\n", + "a1x [0.14226294]\n", + "a1x [0.14226294]\n", + "a1x [0.14226296]\n", + "No feasible solution found. Consider adjusting parameters.\n" + ] + } + ], + "source": [ + "import numpy as np\n", + "from scipy.optimize import minimize\n", + "\n", + "def distance(P1, P2):\n", + " \"\"\" Compute Euclidean distance between two points \"\"\"\n", + " return np.linalg.norm(P1 - P2)\n", + "\n", + "def detect_collision(P1, P2, V1, V2, r1, r2):\n", + " \"\"\"\n", + " Checks if a collision is possible based on initial relative motion.\n", + " \"\"\"\n", + " delta_p = P2 - P1 # Displacement vector\n", + " delta_v = V2 - V1 # Relative velocity\n", + "\n", + " # Check if objects are moving toward each other\n", + " return np.dot(delta_p, delta_v) < 0\n", + "\n", + "def constraint_no_collision(a1x, P1, P2, V1, V2, r1, r2, t_final):\n", + " \"\"\"\n", + " Ensures that Object 1 does not collide with Object 2 during the movement.\n", + " \"\"\"\n", + "\n", + " # Simulate motion for multiple time steps\n", + " time_steps = np.linspace(0, t_final, num=100)\n", + " print('a1x', a1x)\n", + " for t in time_steps:\n", + " # Update Object 1's position (only decelerating in x-direction)\n", + " new_x = P1[0] + V1[0] * t - 0.5 * a1x[0] * t**2 # Apply braking in x direction\n", + " new_y = P1[1] + V1[1] * t # Y-direction remains unchanged\n", + " P1_new = np.array([new_x, new_y])\n", + "\n", + " # Update Object 2's position (continues moving)\n", + " P2_new = P2 + V2 * t\n", + "\n", + " # Check if collision occurs\n", + " if distance(P1_new, P2_new) < (r1 + r2):\n", + " return distance(P1_new, P2_new) - (r1 + r2) # Violation of collision constraint\n", + "\n", + " return distance(P1_new, P2_new) - (r1 + r2) # No collision\n", + "\n", + "def objective(a1x):\n", + " \"\"\" Objective function: minimize deceleration a1x \"\"\"\n", + " return a1x # Minimize braking force\n", + "\n", + "if __name__ == \"__main__\":\n", + " # Define initial conditions\n", + " P1 = np.array([0, 0]) # Initial position of Object 1\n", + " V1 = np.array([5, 2]) # Initial velocity of Object 1\n", + " P2 = np.array([10, 10]) # Initial position of Object 2\n", + " V2 = np.array([2, -1]) # Velocity of Object 2\n", + " r1, r2 = 1, 1 # Radii of the objects\n", + " t_final = 5 # Maximum time to check for collisions\n", + "\n", + " # Check if collision is even possible before optimization\n", + " if detect_collision(P1, P2, V1, V2, r1, r2):\n", + " print(\"Collision detected. Computing optimal braking force...\")\n", + "\n", + " # Initial guess for deceleration\n", + " a1x0 = [0.1] # Small initial deceleration\n", + "\n", + " # Bounds: a1x in [0, max] to ensure non-negative braking\n", + " bounds = [(0.0, 10000000)] # Prevents infinite braking\n", + "\n", + " # Solve optimization problem\n", + " res = minimize(\n", + " objective,\n", + " a1x0,\n", + " bounds=bounds,\n", + " constraints={'type': 'ineq', 'fun': constraint_no_collision, 'args': (P1, P2, V1, V2, r1, r2, t_final)},\n", + " method=\"SLSQP\"\n", + " )\n", + "\n", + " # Optimal deceleration\n", + " if res.success:\n", + " optimal_a1x = res.x[0]\n", + " print(f\"Optimal Deceleration in x-direction: {optimal_a1x:.4f}\")\n", + " else:\n", + " print(\"No feasible solution found. Consider adjusting parameters.\")\n", + " else:\n", + " print(\"No collision detected. No braking required.\")\n" + ] + }, + { + "cell_type": "code", + "execution_count": 35, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Collision detected. Computing optimal braking force...\n", + "Estimated time to collision: 0.02 seconds\n", + "No feasible solution found. Consider adjusting parameters.\n" + ] + } + ], + "source": [ + "import numpy as np\n", + "from scipy.optimize import minimize\n", + "\n", + "def distance(P1, P2):\n", + " \"\"\" Compute Euclidean distance between two points \"\"\"\n", + " return np.linalg.norm(P1 - P2)\n", + "\n", + "def detect_collision(P1, P2, V1, V2, r1, r2):\n", + " \"\"\"\n", + " Checks if a collision is possible based on initial relative motion.\n", + " \"\"\"\n", + " delta_p = P2 - P1 # Displacement vector\n", + " delta_v = V2 - V1 # Relative velocity\n", + "\n", + " # Check if objects are moving toward each other\n", + " return np.dot(delta_p, delta_v) < 0\n", + "\n", + "def estimate_collision_time(P1, P2, V1, V2, r1, r2):\n", + " \"\"\"\n", + " Estimates the time of closest approach (if collision is possible).\n", + " \"\"\"\n", + " delta_p = P2 - P1\n", + " delta_v = V2 - V1\n", + "\n", + " if np.linalg.norm(delta_v) < 1e-6:\n", + " return np.inf # If objects have no relative velocity, they won't collide\n", + "\n", + " t_c = -np.dot(delta_p, delta_v) / np.linalg.norm(delta_v)**2\n", + "\n", + " if t_c < 0:\n", + " return np.inf # If t_c is negative, the objects were closer in the past\n", + "\n", + " return t_c\n", + "\n", + "def constraint_no_collision(a1x, P1, P2, V1, V2, r1, r2):\n", + " \"\"\"\n", + " Ensures Object 1 does not collide with Object 2 during movement.\n", + " \"\"\"\n", + "\n", + " # Estimate time of closest approach\n", + " # t_c = estimate_collision_time(P1, P2, V1, V2, r1, r2)\n", + " # if t_c == np.inf:\n", + " # return 1 # No collision risk\n", + "\n", + " # Simulate motion for multiple time steps up to t_c\n", + " time_steps = np.linspace(0, min(500, 5), num=10000) # Consider max 5 seconds\n", + "\n", + " for t in time_steps:\n", + " # Update Object 1's position (only decelerating in x-direction)\n", + " new_x = P1[0] + V1[0] * t - 0.5 * a1x[0] * t**2 # Apply braking in x direction\n", + " new_y = P1[1] + V1[1] * t # Y-direction remains unchanged\n", + " P1_new = np.array([new_x, new_y])\n", + "\n", + " # Update Object 2's position (continues moving)\n", + " P2_new = P2 + V2 * t\n", + "\n", + " # Check if collision occurs\n", + " d = distance(P1_new, P2_new) - (r1 + r2)\n", + "\n", + " if d < 0: # Collision detected\n", + " return d # Negative value signals violation\n", + "\n", + " return 1 # Constraint satisfied\n", + "\n", + "def objective(a1x):\n", + " \"\"\" Objective function: minimize deceleration a1x \"\"\"\n", + " return a1x # Minimize braking force\n", + "\n", + "if __name__ == \"__main__\":\n", + " # Define initial conditions\n", + " P1 = np.array([0, 0]) # Initial position of Object 1\n", + " V1 = np.array([500, 500]) # Initial velocity of Object 1\n", + " P2 = np.array([10, 10]) # Initial position of Object 2\n", + " V2 = np.array([1, 1]) # Velocity of Object 2\n", + " r1, r2 = 1, 1 # Radii of the objects\n", + "\n", + " # Check if a collision is even possible before optimization\n", + " if detect_collision(P1, P2, V1, V2, r1, r2):\n", + " print(\"Collision detected. Computing optimal braking force...\")\n", + "\n", + " # Estimate time to collision\n", + " t_final = estimate_collision_time(P1, P2, V1, V2, r1, r2)\n", + "\n", + " if t_final == np.inf:\n", + " print(\"No risk of collision. No braking required.\")\n", + " else:\n", + " print(f\"Estimated time to collision: {t_final:.2f} seconds\")\n", + "\n", + " # Initial guess for deceleration\n", + " a1x0 = [0.1] # Small initial deceleration\n", + "\n", + " # Bounds: a1x in [0, 50] to ensure non-negative braking\n", + " bounds = [(0.0, 50)] # Allow strong braking if needed\n", + "\n", + " # Solve optimization problem\n", + " res = minimize(\n", + " objective,\n", + " a1x0,\n", + " bounds=bounds,\n", + " constraints={'type': 'ineq', 'fun': constraint_no_collision, 'args': (P1, P2, V1, V2, r1, r2)},\n", + " method=\"SLSQP\"\n", + " )\n", + "\n", + " # Optimal deceleration\n", + " if res.success:\n", + " optimal_a1x = res.x[0]\n", + " print(f\"Optimal Deceleration in x-direction: {optimal_a1x:.4f}\")\n", + " else:\n", + " print(\"No feasible solution found. Consider adjusting parameters.\")\n", + " else:\n", + " print(\"No collision detected. No braking required.\")\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.8.10" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/launch/pedestrian_detection.yaml b/launch/pedestrian_detection.yaml index 524e109dc..62dd8f202 100644 --- a/launch/pedestrian_detection.yaml +++ b/launch/pedestrian_detection.yaml @@ -23,7 +23,11 @@ drive: type: longitudinal_planning.YieldTrajectoryPlanner args: mode: 'real' - planner: 'dt' # 'milestone', 'dt', or 'dx' + params: { + 'planner': 'dt', # 'milestone', 'dt', or 'dx' + 'desired_speed': 1.0, # m/s, 1.5 m/s seems max speed? Feb24 + 'acceleration': 0.75 # m/s2, 0.5 is not enough to start moving. Feb24 + } trajectory_tracking: type: pure_pursuit.PurePursuitTrajectoryTracker print: False diff --git a/scenes/xyhead_demo.yaml b/scenes/xyhead_demo.yaml index 2dbc6c267..2aa1059ed 100644 --- a/scenes/xyhead_demo.yaml +++ b/scenes/xyhead_demo.yaml @@ -8,4 +8,10 @@ agents: target: [15.0,10.0] # target: [10.0,10.0] behavior: loop - \ No newline at end of file + + # ped2: + # type: pedestrian + # position: [15.0, 0.0] + # nominal_velocity: 0.25 + # target: [15.0,10.0] + # behavior: loop