diff --git a/GEMstack/knowledge/vehicle/gem_e4_sensor_environment.yaml b/GEMstack/knowledge/vehicle/gem_e4_sensor_environment.yaml index b9e719261..af509af66 100644 --- a/GEMstack/knowledge/vehicle/gem_e4_sensor_environment.yaml +++ b/GEMstack/knowledge/vehicle/gem_e4_sensor_environment.yaml @@ -2,4 +2,5 @@ ros_topics: top_lidar: /ouster/points front_camera: /oak/rgb/image_raw front_depth: /oak/stereo/image_raw - gnss: /novatel/inspva + gnss: /septentrio_gnss/insnavgeod + diff --git a/GEMstack/onboard/interface/gem_hardware.py b/GEMstack/onboard/interface/gem_hardware.py index fc6431f21..d50a38a0f 100644 --- a/GEMstack/onboard/interface/gem_hardware.py +++ b/GEMstack/onboard/interface/gem_hardware.py @@ -1,7 +1,7 @@ from .gem import * from ...utils import settings import math - +import time # ROS Headers import rospy from std_msgs.msg import String, Bool, Float32, Float64 @@ -178,6 +178,7 @@ def callback_with_gnss_reading(inspva_msg: Inspva): else: def callback_with_gnss_reading(msg: INSNavGeod): pose = ObjectPose(ObjectFrameEnum.GLOBAL, + t = time.time(), x=msg.longitude, y=msg.latitude, z=msg.height, @@ -185,6 +186,7 @@ def callback_with_gnss_reading(msg: INSNavGeod): roll=math.radians(msg.roll), pitch=math.radians(msg.pitch), ) + # print("@@@@@, POSE", pose.x, pose.y) speed = np.sqrt(msg.ve**2 + msg.vn**2) callback(GNSSReading(pose,speed,('error' if msg.error else 'ok'))) self.gnss_sub = rospy.Subscriber(topic, INSNavGeod, callback_with_gnss_reading) diff --git a/GEMstack/onboard/perception/state_estimation.py b/GEMstack/onboard/perception/state_estimation.py index db96ad38f..4d19d4fa4 100644 --- a/GEMstack/onboard/perception/state_estimation.py +++ b/GEMstack/onboard/perception/state_estimation.py @@ -37,8 +37,8 @@ def state_outputs(self) -> List[str]: return ['vehicle'] def healthy(self): - return True - # return self.gnss_pose is not None TODO: fix + # return True + return self.gnss_pose is not None def update(self) -> VehicleState: if self.gnss_pose is None: diff --git a/GEMstack/onboard/planning/longitudinal_planning.py b/GEMstack/onboard/planning/longitudinal_planning.py index d967d688e..a75fb56c3 100644 --- a/GEMstack/onboard/planning/longitudinal_planning.py +++ b/GEMstack/onboard/planning/longitudinal_planning.py @@ -10,11 +10,224 @@ import matplotlib.patches as patches import math -def detect_collision(curr_x: float, curr_y: float, curr_v: float, obj: AgentState, min_deceleration: float, max_deceleration: float) -> Tuple[bool, float]: + + +################################################################################ +########## Collisiong Detection ################################################ +################################################################################ + + + +class CollisionDetector: + """ + Simulation class to update positions of two rectangles (vehicle and pedestrian) + with velocities v1 and v2, performing collision detection at each time step. + All functions remain within the class, and variables defined in __init__ remain unchanged; + local copies are used during simulation. + """ + def __init__(self, x1, y1, t1, x2, y2, t2, v1, v2, total_time=10.0, desired_speed=2.0, acceleration=0.5): + + self.vehicle_x = x1 + self.vehicle_y = y1 + self.pedestrian_x = x2 + self.pedestrian_y = y2 + + # Vehicle parameters with buffer adjustments + self.vehicle_size_x = 3.2 + self.vehicle_size_y = 1.7 + self.vehicle_buffer_x = 3.0 + 1.0 + self.vehicle_buffer_y = 2.0 + + # Vehicle rectangle + self.x1 = self.vehicle_x + (self.vehicle_size_x + self.vehicle_buffer_x)*0.5 # Offset for buffer (remains constant) + self.y1 = self.vehicle_y + self.w1 = self.vehicle_size_x + self.vehicle_buffer_x # Increase width with buffer + self.h1 = self.vehicle_size_y + self.vehicle_buffer_y # Increase height with buffer + self.t1 = t1 + + # Pedestrian rectangle + self.x2 = x2 + self.y2 = y2 + self.w2 = 0.5 + self.h2 = 0.5 + self.t2 = t2 + + # Velocities are expected as [vx, vy] + self.v1 = v1 + self.v2 = v2 + + self.dt = 0.1 # seconds + self.total_time = total_time # seconds + + self.desired_speed = desired_speed + self.acceleration = acceleration + + def set_params(self, speed, acceleration): + self.desired_speed = speed + self.acceleration = acceleration + + def get_corners(self, x, y, w, h, theta): + """ + Returns the 4 corners of a rotated rectangle. + (x, y): center of rectangle + w, h: width and height of rectangle + theta: rotation angle in radians + """ + cos_t = math.cos(theta) + sin_t = math.sin(theta) + hw, hh = w / 2.0, h / 2.0 + + corners = np.array([ + [ hw * cos_t - hh * sin_t, hw * sin_t + hh * cos_t], + [-hw * cos_t - hh * sin_t, -hw * sin_t + hh * cos_t], + [-hw * cos_t + hh * sin_t, -hw * sin_t - hh * cos_t], + [ hw * cos_t + hh * sin_t, hw * sin_t - hh * cos_t] + ]) + + corners[:, 0] += x + corners[:, 1] += y + + return corners + + def get_axes(self, rect): + axes = [] + for i in range(len(rect)): + p1 = rect[i] + p2 = rect[(i + 1) % len(rect)] + edge = p2 - p1 + normal = np.array([-edge[1], edge[0]]) + norm = np.linalg.norm(normal) + if norm: + normal /= norm + axes.append(normal) + return axes + + def project(self, rect, axis): + dots = np.dot(rect, axis) + return np.min(dots), np.max(dots) + + def sat_collision(self, rect1, rect2): + """ + Determines if two convex polygons (rectangles) collide using the + Separating Axis Theorem (SAT). + rect1 and rect2 are numpy arrays of shape (4,2) representing their corners. + """ + axes1 = self.get_axes(rect1) + axes2 = self.get_axes(rect2) + + for axis in axes1 + axes2: + min1, max1 = self.project(rect1, axis) + min2, max2 = self.project(rect2, axis) + if max1 < min2 or max2 < min1: + return False + return True + + def plot_rectangles(self, rect1, rect2, collision, ax): + """ + Plot two rectangles on a provided axis and indicate collision by color. + """ + color = 'red' if collision else 'blue' + for rect in [rect1, rect2]: + polygon = patches.Polygon(rect, closed=True, edgecolor=color, fill=False, linewidth=2) + ax.add_patch(polygon) + ax.scatter(rect[:, 0], rect[:, 1], color=color, zorder=3) + ax.set_title(f"Collision: {'Yes' if collision else 'No'}") + + def run(self, is_displayed=False): + collision_distance = -1 + steps = int(self.total_time / self.dt) + 1 + + # Create local variables for positions; these will be updated + # without modifying the __init__ attributes. + current_x1 = self.x1 + current_y1 = self.y1 + current_x2 = self.x2 + current_y2 = self.y2 + current_v1 = self.v1[0] + + if is_displayed: + plt.ion() # Turn on interactive mode + fig, ax = plt.subplots(figsize=(10,5)) + + for i in range(steps): + t_sim = i * self.dt + + # Compute rectangle corners using the local positional variables. + rect1 = self.get_corners(current_x1, current_y1, self.w1, self.h1, self.t1) + rect2 = self.get_corners(current_x2, current_y2, self.w2, self.h2, self.t2) + + # Perform collision detection. + collision = self.sat_collision(rect1, rect2) + + if is_displayed: + # Plot the current step. + ax.clear() + self.plot_rectangles(rect1, rect2, collision, ax) + ax.set_aspect('equal') + ax.grid(True, linestyle='--', alpha=0.5) + ax.set_xlim(self.vehicle_x - 5, self.vehicle_x + 20) + ax.set_ylim(self.vehicle_y -5, self.vehicle_y +5) + + # Draw an additional rectangle (vehicle body) at the vehicle's center. + rect_vehiclebody = patches.Rectangle( + (current_x1 - (self.vehicle_size_x + self.vehicle_buffer_x)*0.5, current_y1 - self.vehicle_size_y * 0.5), + self.w1 - self.vehicle_buffer_x, + self.h1 - self.vehicle_buffer_y, + edgecolor='green', + fill=False, + linewidth=2, + linestyle='--' + ) + ax.add_patch(rect_vehiclebody) + + ax.text(0.01, 0.99, f"t = {t_sim:.1f}s", fontsize=12, transform=ax.transAxes, verticalalignment='top') + plt.draw() + + # Pause briefly to simulate real-time updating. + plt.pause(self.dt * 0.05) + + # Stop simulation if collision is detected. + if collision: + current_vehicle_x = current_x1 - (self.vehicle_size_x + self.vehicle_buffer_x) * 0.5 + current_vehicle_y = current_y1 + collision_distance = current_vehicle_x - self.vehicle_x + break + + # Update the vehicle's speed if it is not at the desired speed. + next_v = current_v1 + self.acceleration * self.dt + # Accelerating: Check if the vehicle is about to exceed the desired speed. + if next_v > self.desired_speed and current_v1 <= self.desired_speed: + current_v1 = self.desired_speed + # Decelerating: Check if the vehicle is about to fall below the desired speed. + elif next_v < self.desired_speed and current_v1 >= self.desired_speed: + current_v1 = self.desired_speed + else: + current_v1 = next_v + + current_v1 = 0.0 if current_v1 < 0.0 else current_v1 + + # Update local positions based on velocities. + current_x1 += current_v1 * self.dt + current_y1 += self.v1[1] * self.dt + current_x2 += self.v2[0] * self.dt + current_y2 += self.v2[1] * self.dt + + if is_displayed: + plt.ioff() + plt.show(block=True) + + # print("Collision distance:", collision_distance) + + return collision_distance + + + +def detect_collision(curr_x: float, curr_y: float, curr_v: float, obj: AgentState, min_deceleration: float, max_deceleration: float, acceleration: float, max_speed: float) -> Tuple[bool, float]: """Detects if a collision will occur with the given object and return deceleration to avoid it.""" # Get the object's position and velocity obj_x = obj.pose.x + # obj_x = 0.0 obj_y = obj.pose.y obj_v_x = obj.velocity[0] obj_v_y = obj.velocity[1] @@ -50,7 +263,7 @@ def detect_collision(curr_x: float, curr_y: float, curr_v: float, obj: AgentStat # The object is to the side of the vehicle print("Object is to the side of the vehicle") return False, 0.0 - # The object overlaps with the vehicle + # The object overlaps with the vehicle's buffer return True, max_deceleration if vehicle_right - vehicle_buffer_y > pedestrian_left and obj_v_y <= 0: @@ -63,65 +276,95 @@ def detect_collision(curr_x: float, curr_y: float, curr_v: float, obj: AgentStat print("Object is to the left of the vehicle and moving away") return False, 0.0 - if vehicle_front + vehicle_buffer_x >= pedestrian_back: + if vehicle_front + vehicle_buffer_x >= pedestrian_back and (vehicle_right - vehicle_buffer_y <= pedestrian_left and vehicle_left + vehicle_buffer_y >= pedestrian_right): # The object is in front of the vehicle and within the buffer print("Object is in front of the vehicle and within the buffer") return True, max_deceleration # Calculate the deceleration needed to avoid a collision print("Object is in front of the vehicle and outside the buffer") - distance = pedestrian_back - vehicle_front - vehicle_buffer_x + distance = pedestrian_back - vehicle_front + distance_with_buffer = pedestrian_back - vehicle_front - vehicle_buffer_x relative_v = curr_v - obj_v_x if relative_v <= 0: return False, 0.0 - - print(relative_v, distance) - if obj_v_y > 0 and ((obj_y - curr_y) / relative_v) < ((vehicle_right - vehicle_buffer_y - yield_buffer_y - pedestrian_left) / abs(obj_v_y)): - # The object is to the right of the vehicle and moving towards it, but the vehicle will pass before the object reaches the vehicle - print("The object is to the right of the vehicle and moving towards it, but the vehicle will pass before the object reaches the vehicle") - return False, 0.0 - if obj_v_y < 0 and ((obj_y - curr_y) / relative_v) < ((pedestrian_right - vehicle_left - vehicle_buffer_y - yield_buffer_y) / abs(obj_v_y)): - # The object is to the left of the vehicle and moving towards it, but the vehicle will pass before the object reaches the vehicle - print("The object is to the left of the vehicle and moving towards it, but the vehicle will pass before the object reaches the vehicle") - return False, 0.0 + if obj_v_y == 0: + # The object is in front of the vehicle blocking it + deceleration = relative_v ** 2 / (2 * distance_with_buffer) + if deceleration > max_deceleration: + return True, max_deceleration + if deceleration < min_deceleration: + return False, 0.0 + + return True, deceleration - if obj_v_y != 0: - if obj_v_y < 0: - # The object is moving toward the right side of the vehicle - distance_to_pass = obj_y - (vehicle_right - vehicle_buffer_y - yield_buffer_y) + pedestrian_width / 2 - elif obj_v_y > 0: - # The object is moving toward the left side of the vehicle - distance_to_pass = (vehicle_left + vehicle_buffer_y + yield_buffer_y) - obj_y + pedestrian_width / 2 + print(relative_v, distance_with_buffer) - time_to_pass = distance_to_pass / abs(obj_v_y) + if obj_v_y > 0: + # The object is to the right of the vehicle and moving towards it + time_to_get_close = (vehicle_right - vehicle_buffer_y - yield_buffer_y - pedestrian_left) / abs(obj_v_y) + time_to_pass = (vehicle_left + vehicle_buffer_y + yield_buffer_y - pedestrian_right) / abs(obj_v_y) + else: + # The object is to the left of the vehicle and moving towards it + time_to_get_close = (pedestrian_right - vehicle_left - vehicle_buffer_y - yield_buffer_y) / abs(obj_v_y) + time_to_pass = (pedestrian_left - vehicle_right + vehicle_buffer_y + yield_buffer_y) / abs(obj_v_y) - distance_to_move = pedestrian_back - vehicle_front - vehicle_buffer_x + time_to_pass * obj_v_y + time_to_accel_to_max_speed = (max_speed - curr_v) / acceleration + distance_to_accel_to_max_speed = (max_speed + curr_v - 2 * obj_v_x) * time_to_accel_to_max_speed / 2 #area of trapezoid + + if distance_to_accel_to_max_speed > distance_with_buffer: + # The object will reach the buffer before reaching max speed + time_to_buffer_when_accel = (-relative_v + (relative_v * relative_v + 2 * distance_with_buffer * acceleration) ** 0.5) / acceleration + else: + # The object will reach the buffer after reaching max speed + time_to_buffer_when_accel = time_to_accel_to_max_speed + (distance_with_buffer - distance_to_accel_to_max_speed) / (max_speed - obj_v_x) + if distance_to_accel_to_max_speed > distance: + # We will collide before reaching max speed + time_to_collide_when_accel = (-relative_v + (relative_v * relative_v + 2 * distance * acceleration) ** 0.5) / acceleration + else: + # We will collide after reaching max speed + time_to_collide_when_accel = time_to_accel_to_max_speed + (distance - distance_to_accel_to_max_speed) / (max_speed - obj_v_x) - # if curr_v**2/2*distance_to_pass >= 1.5: - # return True, curr_v**2/2*distance_to_pass - time_to_max_v = (5 - curr_v)/0.5 + if time_to_get_close > time_to_collide_when_accel: + # We can do normal driving and will pass the object before it gets in our way + print("We can do normal driving and will pass the object before it gets in our way") + return False, 0.0 - if time_to_max_v > time_to_pass: - if curr_v*time_to_pass + 0.25*time_to_pass**2 > distance_to_move: - return False, 0.0 - else: - if (25 - curr_v**2)*4 + (time_to_pass - time_to_max_v) * 5 >= distance_to_move: - return False, 0.0 + if vehicle_front + vehicle_buffer_x >= pedestrian_back: + # We cannot move pass the pedestrian before it reaches the buffer from side + return True, max_deceleration + if time_to_pass < time_to_buffer_when_accel: + # The object will pass through our front before we drive normally and reach it + print("The object will pass through our front before we drive normally and reach it") + return False, 0.0 - return True, [distance_to_move, time_to_pass] + distance_to_move = distance_with_buffer + time_to_pass * obj_v_y + # if curr_v**2/2*distance_to_pass >= 1.5: + # return True, curr_v**2/2*distance_to_pass + time_to_max_v = (5 - curr_v)/0.5 + + if time_to_max_v > time_to_pass: + if curr_v*time_to_pass + 0.25*time_to_pass**2 > distance_to_move: + return False, 0.0 else: - deceleration = relative_v ** 2 / (2 * distance) - if deceleration > max_deceleration: - return True, max_deceleration - if deceleration < min_deceleration: + if (25 - curr_v**2)*4 + (time_to_pass - time_to_max_v) * 5 >= distance_to_move: return False, 0.0 - return True, deceleration + + return True, [distance_to_move, time_to_pass] + + + +################################################################################ +########## Longitudinal Planning ############################################### +################################################################################ + + def longitudinal_plan(path : Path, acceleration : float, deceleration : float, max_speed : float, current_speed : float, method : str) -> Trajectory: @@ -187,7 +430,7 @@ def longitudinal_plan_milestone(path : Path, acceleration : float, deceleration new_points.append(cur_point) new_times.append(cur_time) velocities.append(current_speed) - print("=====================================") + # print("=====================================") # print("new points: ", new_points) # print("current index: ", cur_index) # print("current speed: ", current_speed) @@ -829,21 +1072,158 @@ def longitudinal_brake(path : Path, deceleration : float, current_speed : float) trajectory = Trajectory(path.frame,points,times,velocities) return trajectory + + +################################################################################ +########## Yield Trajectory Planner ############################################ +################################################################################ + + + +# class YieldTrajectoryPlanner(Component): +# """Follows the given route. Brakes if you have to yield or +# you are at the end of the route, otherwise accelerates to +# the desired speed. +# """ + +# def __init__(self, mode : str = 'real', planner : str = 'milestone'): +# self.route_progress = None +# self.t_last = None +# self.acceleration = 0.5 +# self.desired_speed = 1.0 +# self.deceleration = 2.0 + +# self.min_deceleration = 1.0 +# self.max_deceleration = 8.0 + +# self.mode = mode +# self.planner = planner + +# def state_inputs(self): +# return ['all'] + +# def state_outputs(self) -> List[str]: +# return ['trajectory'] + +# def rate(self): +# return 10.0 + +# def update(self, state : AllState): +# start_time = time.time() + +# vehicle = state.vehicle # type: VehicleState +# route = state.route # type: Route +# t = state.t + +# if self.t_last is None: +# self.t_last = t +# dt = t - self.t_last + +# # Position in vehicle frame (Start (0,0) to (15,0)) +# curr_x = vehicle.pose.x +# curr_y = vehicle.pose.y +# curr_v = vehicle.v + +# abs_x = curr_x + state.start_vehicle_pose.x +# abs_y = curr_y + state.start_vehicle_pose.y + +# ############################################### +# print("@@@@@ VEHICLE STATE @@@@@") +# print(vehicle) +# print("@@@@@@@@@@@@@@@@@@@@@@@@@") + +# if self.mode == 'real': +# # Position in vehicle frame (Start (0,0) to (15,0)) +# curr_x = vehicle.pose.x * 20 +# curr_y = vehicle.pose.y * 20 +# curr_v = vehicle.v +# # print("@@@@@ PLAN", curr_x, curr_y, curr_v) +# abs_x = curr_x +# abs_y = curr_y +# # print("@@@@@ PLAN", abs_x, abs_y) +# ############################################### + +# #figure out where we are on the route +# if self.route_progress is None: +# self.route_progress = 0.0 +# closest_dist,closest_parameter = state.route.closest_point_local((curr_x,curr_y),[self.route_progress-5.0,self.route_progress+5.0]) +# self.route_progress = closest_parameter + +# lookahead_distance = max(10, curr_v**2 / (2 * self.deceleration)) +# route_with_lookahead = route.trim(closest_parameter,closest_parameter + lookahead_distance) +# print("Lookahead distance:", lookahead_distance) + +# route_to_end = route.trim(closest_parameter, len(route.points) - 1) + +# should_yield = False +# yield_deceleration = 0.0 + +# print("Current Speed: ", curr_v) + +# for r in state.relations: +# if r.type == EntityRelationEnum.YIELDING and r.obj1 == '': +# #get the object we are yielding to +# obj = state.agents[r.obj2] + +# if self.mode == 'real': +# obj.pose.x = obj.pose.x + curr_x +# obj.pose.y = obj.pose.y + curr_y + +# detected, deceleration = detect_collision(abs_x, abs_y, curr_v, obj, self.min_deceleration, self.max_deceleration, self.acceleration, self.desired_speed) +# if isinstance(deceleration, list): +# print("@@@@@ INPUT", deceleration) +# time_collision = deceleration[1] +# distance_collision = deceleration[0] +# b = 3*time_collision - 2*curr_v +# c = curr_v**2 - 3*distance_collision +# desired_speed = (-b + (b**2 - 4*c)**0.5)/2 +# deceleration = 1.5 +# print("@@@@@ YIELDING", desired_speed) +# route_with_lookahead = route.trim(closest_parameter,closest_parameter + distance_collision) +# traj = longitudinal_plan(route_with_lookahead, self.acceleration, deceleration, desired_speed, curr_v) +# return traj +# else: +# if detected and deceleration > 0: +# yield_deceleration = max(deceleration, yield_deceleration) +# should_yield = True + +# print("should yield: ", should_yield) + +# should_accelerate = (not should_yield and curr_v < self.desired_speed) + +# #choose whether to accelerate, brake, or keep at current velocity +# if should_accelerate: +# traj = longitudinal_plan(route_with_lookahead, self.acceleration, self.deceleration, self.desired_speed, curr_v, "milestone") +# elif should_yield: +# traj = longitudinal_brake(route_with_lookahead, yield_deceleration, curr_v) +# else: +# traj = longitudinal_plan(route_with_lookahead, 0.0, self.deceleration, self.desired_speed, curr_v, "dt") + +# return traj + + class YieldTrajectoryPlanner(Component): """Follows the given route. Brakes if you have to yield or you are at the end of the route, otherwise accelerates to the desired speed. """ - def __init__(self): + def __init__(self, mode : str = 'real', planner : str = 'milestone'): self.route_progress = None self.t_last = None - self.acceleration = 0.5 - self.desired_speed = 1.0 - self.deceleration = 2.0 + self.acceleration = 0.75 # 0.5 is not enough to start + self.desired_speed = 1.0 # cannot got more than 1.5 m/s + + # Yielding parameters + # Yielding speed [..., 1.0, 0.8, ..., 0.2] + self.yield_speed = [v for v in np.arange(self.desired_speed, 0.1, -0.25)] + self.yield_deceleration = 0.5 + self.deceleration = 2.0 + self.max_deceleration = 8.0 - self.min_deceleration = 1.0 - self.max_deceleration = 8.0 + # Planner mode + self.mode = mode + self.planner = planner def state_inputs(self): return ['all'] @@ -865,7 +1245,7 @@ def update(self, state : AllState): self.t_last = t dt = t - self.t_last - # Position in vehicle frame (Start (0,0) to (15,0)) + curr_x = vehicle.pose.x curr_y = vehicle.pose.y curr_v = vehicle.v @@ -873,56 +1253,187 @@ def update(self, state : AllState): abs_x = curr_x + state.start_vehicle_pose.x abs_y = curr_y + state.start_vehicle_pose.y + ############################################### + # # TODO: Fix the coordinate conversion of other files + + print("@@@@@ VEHICLE STATE @@@@@") + print(vehicle) + print("@@@@@@@@@@@@@@@@@@@@@@@@@") + + if self.mode == 'real': + # Position in vehicle frame (Start (0,0) to (15,0)) + curr_x = vehicle.pose.x * 20 + curr_y = vehicle.pose.y * 20 + curr_v = vehicle.v + # print("@@@@@ PLAN", curr_x, curr_y, curr_v) + abs_x = curr_x + abs_y = curr_y + # print("@@@@@ PLAN", abs_x, abs_y) + ############################################### + + #figure out where we are on the route if self.route_progress is None: self.route_progress = 0.0 closest_dist,closest_parameter = state.route.closest_point_local((curr_x,curr_y),[self.route_progress-5.0,self.route_progress+5.0]) self.route_progress = closest_parameter - lookahead_distance = max(10, curr_v**2 / (2 * self.deceleration)) + lookahead_distance = max(10, curr_v**2 / (2 * self.yield_deceleration)) route_with_lookahead = route.trim(closest_parameter,closest_parameter + lookahead_distance) print("Lookahead distance:", lookahead_distance) + #extract out a 10m segment of the route + # route_with_lookahead = route.trim(closest_parameter,closest_parameter+10.0) - route_to_end = route.trim(closest_parameter, len(route.points) - 1) - - should_yield = False - yield_deceleration = 0.0 - print("Current Speed: ", curr_v) + # Default values + should_brake = False + desired_speed = self.desired_speed + accel = self.acceleration + decel = self.deceleration for r in state.relations: if r.type == EntityRelationEnum.YIELDING and r.obj1 == '': - #get the object we are yielding to - obj = state.agents[r.obj2] - - detected, deceleration = detect_collision(abs_x, abs_y, curr_v, obj, self.min_deceleration, self.max_deceleration) - if isinstance(deceleration, list): - print("@@@@@ INPUT", deceleration) - time_collision = deceleration[1] - distance_collision = deceleration[0] - b = 3*time_collision - 2*curr_v - c = curr_v**2 - 3*distance_collision - desired_speed = (-b + (b**2 - 4*c)**0.5)/2 - deceleration = 1.5 - print("@@@@@ YIELDING", desired_speed) - route_with_lookahead = route.trim(closest_parameter,closest_parameter + distance_collision) - traj = longitudinal_plan(route_with_lookahead, self.acceleration, deceleration, desired_speed, curr_v) - return traj - else: - if detected and deceleration > 0: - yield_deceleration = max(deceleration, yield_deceleration) - should_yield = True - - print("should yield: ", should_yield) + #yielding to something, brake + + #========================= + """ + Collision detection: + - Compute the lookahead distance required to avoid collision using: + d = v^2/(2*a) + - For many steps along the route (using a resolution that adapts if the + planner runs too slowly), simulate the vehicle's future positions. + - If a pedestrian is detected within 3m longitudinal and 1m lateral buffer, + determine the distance-to-collision. Then compute the required deceleration: + a = -(v^2)/(2*d_collision) + - For distant crossing pedestrians, apply a gentle deceleration based on the + perception-estimated pedestrian velocity. + """ + + print("#### YIELDING PLANNING ####") + + # Vehicle parameters. + x1, y1 = abs_x, abs_y + v1 = [curr_v, 0] # Vehicle speed vector + + for n,a in state.agents.items(): + + """ + class ObjectFrameEnum(Enum): + START = 0 #position / yaw in m / radians relative to starting pose of vehicle + CURRENT = 1 #position / yaw in m / radians relative to current pose of vehicle + GLOBAL = 2 #position in longitude / latitude, yaw=heading in radians with respect to true north (used in GNSS) + ABSOLUTE_CARTESIAN = 3 #position / yaw in m / radians relative to a global cartesian reference frame (used in simulation) + """ + print("@@@@@ AGENT STATE @@@@@") + print(a) + print("@@@@@@@@@@@@@@@@@@@@@@@") + + # Pedestrian parameters. + x2, y2 = a.pose.x, a.pose.y + v2 = [a.velocity[0], a.velocity[1]] # Pedestrian speed vector + + if self.mode == 'real': + x2 = a.pose.x + curr_x + y2 = a.pose.y + curr_y + + # Total simulation time + if v1[0] > 0.1: + total_time = min(10, lookahead_distance / v1[0]) + else: + total_time = 10 + print(f"Total time: {total_time:.2f} seconds") + + # Create and run the simulation. + print(f"Vehicle: ({x1:.1f}, {y1:.1f}, ({v1[0]:.1f}, {v1[1]:.1f}))") + print(f"Pedestrian: ({x2:.1f}, {y2:.1f}, ({v2[0]:.1f}, {v2[1]:.1f}))") + + # Simulate if a collision will occur when the vehicle accelerate to desired speed. + sim = CollisionDetector(x1, y1, 0, x2, y2, 0, v1, v2, total_time, desired_speed, accel) + collision_distance = sim.run() + + # No collision detected with acceleration to desired speed. + if collision_distance < 0: + print("No collision detected.") + + # Collision detected with acceleration to desired speed. + # => Check if the vehicle can yield to the pedestrian with deceleration. + else: + + ############################################### + # # UNCOMMENT NOT TO YIELD: JUST STOP FOR PART1 + # print("The vehicle is Stopping.") + # # Decide the deceleration based on the collision distance. + # brake_deceleration = max(self.deceleration, v1[0]**2 / (2 * (collision_distance))) + # if brake_deceleration > self.deceleration: + # if brake_deceleration > self.max_deceleration: + # brake_deceleration = self.max_deceleration + # decel = brake_deceleration if brake_deceleration > decel else decel + # should_brake = True + # break + # break + ############################################### + + print("Collision detected. Try to find yielding speed.") + # Update lookahead distance to pedestrian. + route_with_lookahead = route.trim(closest_parameter, closest_parameter + collision_distance) + + collision_distance_after_yield = -1 + + # Simulate with different yield speeds. + # Try to maximize the yield speed to avoid collision. + for v in self.yield_speed: + # Simulate if the vehicle can yield to the pedestrian with acceleration to yielding speed. + if v > v1[0]: + sim.set_params(v, accel) + # Simulate if the vehicle can yield to the pedestrian with deceleration to yielding speed. + else: + sim.set_params(v, self.yield_deceleration * -1.0) + collision_distance_after_yield = sim.run() + if collision_distance_after_yield < 0: + print(f"Yielding at speed: {v}") + desired_speed = v if v < desired_speed else desired_speed + decel = self.yield_deceleration if self.yield_deceleration > decel else decel + break + + # Collision detected with any yielding speed. + # => Brake to avoid collision. + if collision_distance_after_yield > 0: + print("The vehicle is Stopping.") + # Decide the deceleration based on the collision distance. + brake_deceleration = max(self.deceleration, v1[0]**2 / (2 * (collision_distance))) + if brake_deceleration > self.max_deceleration: + brake_deceleration = self.max_deceleration + decel = brake_deceleration if brake_deceleration > decel else decel + should_brake = True + + break + + # # UNCOMMENT TO BRAKE FOR ALL PEDESTRIANS + # should_brake = True + # desired_speed = 0.0 + # decel = self.deceleration + + # # UNCOMMENT NOT TO BRAKE + # should_brake = False + # desired_speed = self.desired_speed + # decel = self.deceleration + + #========================= - should_accelerate = (not should_yield and curr_v < self.desired_speed) + print(f"Desired speed: {desired_speed:.2f} m/s") + print(f"Deceleration: {decel:.2f} m/s^2") + should_accelerate = (not should_brake and curr_v < self.desired_speed) + + # traj = longitudinal_plan(route_to_end, self.acceleration, self.deceleration, self.desired_speed, curr_v, "milestone") #choose whether to accelerate, brake, or keep at current velocity if should_accelerate: - traj = longitudinal_plan(route_with_lookahead, self.acceleration, self.deceleration, self.desired_speed, curr_v, "dt") - elif should_yield: - traj = longitudinal_brake(route_with_lookahead, yield_deceleration, curr_v) + traj = longitudinal_plan(route_with_lookahead, accel, decel, desired_speed, curr_v, self.planner) + elif should_brake: + traj = longitudinal_brake(route_with_lookahead, decel, curr_v) else: - traj = longitudinal_plan(route_with_lookahead, 0.0, self.deceleration, self.desired_speed, curr_v, "dt") + traj = longitudinal_plan(route_with_lookahead, 0.0, decel, desired_speed, curr_v, self.planner) + + print(f"Simulation took {time.time() - start_time:.3f} seconds.") return traj diff --git a/launch/pedestrian_detection.yaml b/launch/pedestrian_detection.yaml index 98f49a9aa..524e109dc 100644 --- a/launch/pedestrian_detection.yaml +++ b/launch/pedestrian_detection.yaml @@ -19,7 +19,11 @@ drive: route_planning: type: StaticRoutePlanner args: [!relative_path '../GEMstack/knowledge/routes/forward_15m.csv','start'] - motion_planning: longitudinal_planning.YieldTrajectoryPlanner + motion_planning: + type: longitudinal_planning.YieldTrajectoryPlanner + args: + mode: 'real' + planner: 'dt' # 'milestone', 'dt', or 'dx' trajectory_tracking: type: pure_pursuit.PurePursuitTrajectoryTracker print: False @@ -97,3 +101,8 @@ variants: # agent_detection : pedestrian_detection.FakePedestrianDetector2D agent_detection : OmniscientAgentDetector #this option reads agents from the simulator state_estimation : OmniscientStateEstimator + planning: + motion_planning: + type: longitudinal_planning.YieldTrajectoryPlanner + args: + mode: 'simulation' \ No newline at end of file diff --git a/scenes/xyhead_demo.yaml b/scenes/xyhead_demo.yaml index c6d97477a..2dbc6c267 100644 --- a/scenes/xyhead_demo.yaml +++ b/scenes/xyhead_demo.yaml @@ -3,7 +3,9 @@ agents: ped1: type: pedestrian position: [15.0, 2.0] + # position: [10.0, 2.0] nominal_velocity: 0.5 target: [15.0,10.0] + # target: [10.0,10.0] behavior: loop \ No newline at end of file diff --git a/test_longitudinal_plan.py b/test_longitudinal_plan.py deleted file mode 100644 index 400f34fea..000000000 --- a/test_longitudinal_plan.py +++ /dev/null @@ -1,80 +0,0 @@ -#needed to import GEMstack from top level directory -import sys -import os -sys.path.append(os.getcwd()) - -from GEMstack.state import Path, ObjectFrameEnum -from GEMstack.onboard.planning.longitudinal_planning import longitudinal_plan,longitudinal_brake -import matplotlib.pyplot as plt - -def test_longitudinal_planning(): - test_path = Path(ObjectFrameEnum.START,[(0,0),(1,0),(2,0),(3,0),(4,0),(5,0),(6,0)]) - test_path2 = Path(ObjectFrameEnum.START,[(5,0),(6,1),(7,2),(9,4)]) - - test_traj = longitudinal_brake(test_path, 2.0, 0.0) - assert (t1 < t2 for (t1,t2) in zip(test_traj.times[:-1],test_traj.times[1:]) ) - plt.plot(test_traj.times,[p[0] for p in test_traj.points]) - plt.title("Braking from 0 m/s (should just stay still)") - plt.xlabel('time') - plt.ylabel('position') - plt.show() - - test_traj = longitudinal_brake(test_path, 2.0, 2.0) - assert (t1 < t2 for (t1,t2) in zip(test_traj.times[:-1],test_traj.times[1:]) ) - plt.plot(test_traj.times,[p[0] for p in test_traj.points]) - plt.title("Braking from 2 m/s") - plt.xlabel('time') - plt.ylabel('position') - plt.show() - - test_traj = longitudinal_plan(test_path, 1.0, 2.0, 3.0, 0.0) - assert (t1 < t2 for (t1,t2) in zip(test_traj.times[:-1],test_traj.times[1:]) ) - plt.plot(test_traj.times,[p[0] for p in test_traj.points]) - plt.title("Accelerating from 0 m/s") - plt.xlabel('time') - plt.ylabel('position') - plt.show() - - - test_traj = longitudinal_plan(test_path, 1.0, 2.0, 3.0, 2.0) - assert (t1 < t2 for (t1,t2) in zip(test_traj.times[:-1],test_traj.times[1:]) ) - plt.plot(test_traj.times,[p[0] for p in test_traj.points]) - plt.title("Accelerating from 2 m/s") - plt.xlabel('time') - plt.ylabel('position') - plt.show() - - test_traj = longitudinal_plan(test_path, 0.0, 2.0, 3.0, 3.1) - assert (t1 < t2 for (t1,t2) in zip(test_traj.times[:-1],test_traj.times[1:]) ) - plt.plot(test_traj.times,[p[0] for p in test_traj.points]) - plt.title("Keeping constant velocity at 3.1 m/s") - plt.xlabel('time') - plt.ylabel('position') - plt.show() - - test_traj = longitudinal_plan(test_path, 2.0, 2.0, 20.0, 10.0) - assert (t1 < t2 for (t1,t2) in zip(test_traj.times[:-1],test_traj.times[1:]) ) - plt.plot(test_traj.times,[p[0] for p in test_traj.points]) - plt.title("Too little time to stop, starting at 10 m/s") - plt.xlabel('time') - plt.ylabel('position') - plt.show() - - test_traj = longitudinal_brake(test_path, 2.0, 10.0) - plt.plot(test_traj.times,[p[0] for p in test_traj.points]) - plt.title("Too little time to stop, braking at 10 m/s") - plt.xlabel('time') - plt.ylabel('position') - plt.show() - - test_traj = longitudinal_plan(test_path2, 1.0, 2.0, 3.0, 0.0) - plt.plot(test_traj.times,[p[0] for p in test_traj.points]) - plt.title("Nonuniform planning") - plt.xlabel('time') - plt.ylabel('position') - plt.show() - - - -if __name__ == '__main__': - test_longitudinal_planning() diff --git a/testing/test_collision_detection.py b/testing/test_collision_detection.py new file mode 100644 index 000000000..4a8c35c26 --- /dev/null +++ b/testing/test_collision_detection.py @@ -0,0 +1,30 @@ +#needed to import GEMstack from top level directory +import sys +import os +sys.path.append(os.getcwd()) + +from GEMstack.onboard.planning.longitudinal_planning import CollisionDetector +import time + +if __name__ == "__main__": + # Vehicle parameters. x, y, theta (angle in radians) + x1, y1, t1 = 4.0, 5.0, 0 + # Pedestrian parameters. x, y, theta (angle in radians) + x2, y2, t2 = 15.0, 2.1, 0 + # Velocity vectors: [vx, vy] + v1 = [0.1, 0] # Vehicle speed vector + v2 = [0, 0.5] # Pedestrian speed vector + # Total simulation time + total_time = 10.0 + + desired_speed = 0.5 + acceleration = -0.5 + + # Create and run the simulation. + start_time = time.time() + # Simulate with the above parameters: Whether to hit without decelerating + sim = CollisionDetector(x1, y1, t1, x2, y2, t2, v1, v2, total_time, desired_speed, acceleration) + + collision_distance = sim.run(is_displayed=True) + print(f"Collision distance: {collision_distance}") + # print(f"Simulation took {time.time() - start_time:.3f} seconds.") \ No newline at end of file diff --git a/testing/test_longitudinal_plan.py b/testing/test_longitudinal_plan.py index ab16ca5e6..78896e893 100644 --- a/testing/test_longitudinal_plan.py +++ b/testing/test_longitudinal_plan.py @@ -6,7 +6,11 @@ from GEMstack.state import Path, ObjectFrameEnum from GEMstack.onboard.planning.longitudinal_planning import longitudinal_plan,longitudinal_brake import matplotlib.pyplot as plt - + + +mode = "milestone" # milestone, dt, dx + + def test_longitudinal_planning(): test_path = Path(ObjectFrameEnum.START,[(0,0),(1,0),(2,0),(3,0),(4,0),(5,0),(6,0)]) test_path2 = Path(ObjectFrameEnum.START,[(5,0),(6,1),(7,2),(9,4)]) @@ -39,7 +43,7 @@ def test_longitudinal_planning(): plt.ylabel('velocity') plt.show() - test_traj = longitudinal_plan(test_path, 1.1, 2.0, 3.0, 0.0) + test_traj = longitudinal_plan(test_path, 1.1, 2.0, 3.0, 0.0, mode) assert (t1 < t2 for (t1,t2) in zip(test_traj.times[:-1],test_traj.times[1:]) ) plt.plot(test_traj.times,[p[0] for p in test_traj.points]) plt.title("Accelerating from 0 m/s") @@ -54,7 +58,7 @@ def test_longitudinal_planning(): plt.show() - test_traj = longitudinal_plan(test_path, 1.0, 2.0, 3.0, 2.0) + test_traj = longitudinal_plan(test_path, 1.0, 2.0, 3.0, 2.0, mode) assert (t1 < t2 for (t1,t2) in zip(test_traj.times[:-1],test_traj.times[1:]) ) plt.plot(test_traj.times,[p[0] for p in test_traj.points]) plt.title("Accelerating from 2 m/s") @@ -69,7 +73,7 @@ def test_longitudinal_planning(): plt.show() - test_traj = longitudinal_plan(test_path, 0.0, 2.0, 3.0, 3.1) + test_traj = longitudinal_plan(test_path, 0.0, 2.0, 3.0, 3.1, mode) assert (t1 < t2 for (t1,t2) in zip(test_traj.times[:-1],test_traj.times[1:]) ) plt.plot(test_traj.times,[p[0] for p in test_traj.points]) plt.title("Keeping constant velocity at 3.1 m/s") @@ -83,7 +87,7 @@ def test_longitudinal_planning(): plt.ylabel('velocity') plt.show() - test_traj = longitudinal_plan(test_path, 2.0, 2.0, 20.0, 10.0) + test_traj = longitudinal_plan(test_path, 2.0, 2.0, 20.0, 10.0, mode) assert (t1 < t2 for (t1,t2) in zip(test_traj.times[:-1],test_traj.times[1:]) ) plt.plot(test_traj.times,[p[0] for p in test_traj.points]) plt.title("Too little time to stop, starting at 10 m/s") @@ -110,7 +114,7 @@ def test_longitudinal_planning(): plt.ylabel('velocity') plt.show() - test_traj = longitudinal_plan(test_path2, 1.0, 2.0, 3.0, 0.0) + test_traj = longitudinal_plan(test_path2, 1.0, 2.0, 3.0, 0.0, mode) plt.plot(test_traj.times,[p[0] for p in test_traj.points]) plt.title("Nonuniform planning") plt.xlabel('time')