From 6cd25e8230e0e17453702d2b4d42d0037d5c40d3 Mon Sep 17 00:00:00 2001 From: animeshsingh98 Date: Sat, 10 May 2025 15:10:47 -0500 Subject: [PATCH 1/7] Consolidating changes for PR to s2025 branch --- .gitignore | 11 +- .../knowledge/defaults/computation_graph.yaml | 6 + GEMstack/mathutils/quadratic_equation.py | 21 + GEMstack/offboard/log_management/s3.py | 248 +++ .../planning/capture_lidar_camera_data.py | 141 ++ .../onboard/planning/longitudinal_planning.py | 1343 +++++++++++++++++ .../planning/route_planning_component.py | 484 ++++++ GEMstack/scripts/__init__.py | 0 GEMstack/scripts/geotag.py | 167 ++ GEMstack/scripts/register_lidar_scans.py | 296 ++++ GEMstack/state/all.py | 11 +- GEMstack/state/intent.py | 2 + GEMstack/state/mission.py | 6 +- GEMstack/state/mission_plan.py | 19 + GEMstack/state/route.py | 19 +- GEMstack/state/trajectory.py | 25 +- launch/inspection.yaml | 86 ++ 17 files changed, 2862 insertions(+), 23 deletions(-) create mode 100644 GEMstack/mathutils/quadratic_equation.py create mode 100644 GEMstack/offboard/log_management/s3.py create mode 100644 GEMstack/onboard/planning/capture_lidar_camera_data.py create mode 100644 GEMstack/onboard/planning/longitudinal_planning.py create mode 100644 GEMstack/onboard/planning/route_planning_component.py create mode 100644 GEMstack/scripts/__init__.py create mode 100644 GEMstack/scripts/geotag.py create mode 100644 GEMstack/scripts/register_lidar_scans.py create mode 100644 GEMstack/state/mission_plan.py create mode 100644 launch/inspection.yaml diff --git a/.gitignore b/.gitignore index 3da224293..590f58b39 100644 --- a/.gitignore +++ b/.gitignore @@ -22,6 +22,7 @@ downloads/ eggs/ .eggs/ lib/ +!frontend/* lib64/ parts/ sdist/ @@ -32,6 +33,7 @@ share/python-wheels/ .installed.cfg *.egg MANIFEST +.idea/ # PyInstaller # Usually these files are written by a python script from a template @@ -134,6 +136,7 @@ venv/ ENV/ env.bak/ venv.bak/ +*.DS_Store # Spyder project settings .spyderproject @@ -170,8 +173,14 @@ cython_debug/ **/*.run .vscode/ setup/zed_sdk.run + +#Ignore ROS bags +*.bag + cuda/ +homework/yolov8n.pt +homework/yolo11n.pt GEMstack/knowledge/detection/yolov8n.pt GEMstack/knowledge/detection/yolo11n.pt yolov8n.pt -yolo11n.pt \ No newline at end of file +yolo11n.pt diff --git a/GEMstack/knowledge/defaults/computation_graph.yaml b/GEMstack/knowledge/defaults/computation_graph.yaml index d7a606326..949360f75 100644 --- a/GEMstack/knowledge/defaults/computation_graph.yaml +++ b/GEMstack/knowledge/defaults/computation_graph.yaml @@ -43,6 +43,12 @@ components: - route_planning: inputs: [vehicle, roadgraph, mission] outputs: route + - route_planning_component: + inputs: all + outputs: [ route, mission ] + - save_lidar_data: + inputs: all + outputs: - driving_logic: inputs: all outputs: intent diff --git a/GEMstack/mathutils/quadratic_equation.py b/GEMstack/mathutils/quadratic_equation.py new file mode 100644 index 000000000..6580fbec9 --- /dev/null +++ b/GEMstack/mathutils/quadratic_equation.py @@ -0,0 +1,21 @@ +import math + +def quad_root(a : float, b : float, c : float) -> float: + """Calculates the roots of a quadratic equation + + Args: + a (float): coefficient of x^2 + b (float): coefficient of x + c (float): constant term + + Returns: + float: returns all valid roots of the quadratic equation + """ + x1 = (-b + max(0,(b**2 - 4*a*c))**0.5)/(2*a) + x2 = (-b - max(0,(b**2 - 4*a*c))**0.5)/(2*a) + + if math.isnan(x1): x1 = 0 + if math.isnan(x2): x2 = 0 + + valid_roots = [n for n in [x1, x2] if not type(n) is complex] + return valid_roots \ No newline at end of file diff --git a/GEMstack/offboard/log_management/s3.py b/GEMstack/offboard/log_management/s3.py new file mode 100644 index 000000000..b8f2ea4c8 --- /dev/null +++ b/GEMstack/offboard/log_management/s3.py @@ -0,0 +1,248 @@ +#!/usr/bin/env python3 +""" +This client interacts with S3 to upload (push) or download (pull) the latest folder. +For push, the latest folder in the local base directory (default "logs") +is determined based on its timestamp (folder name in "YYYY-MM-DD_HH-MM-SS" format). +For pull, the latest folder under the S3 prefix is selected. + +Before running this client make sure you have defined .env in root with following values: +AWS_ACCESS_KEY_ID=example +AWS_SECRET_ACCESS_KEY=example +AWS_DEFAULT_REGION=us-east-1 + +Requires: + pip install boto3 python-dotenv + +Example Usage: + + # Push the latest folder from the local "logs" directory to S3: + python3 -m GEMstack.offboard.log_management.s3 \ + --action push \ + --bucket cs588 \ + --s3-prefix captures + + # Pull (download) the latest folder from S3 into local "download/" directory: + python3 -m GEMstack.offboard.log_management.s3 \ + --action pull \ + --bucket cs588 \ + --s3-prefix captures \ + --dest-dir download +""" + +import argparse +import boto3 +import os +import sys +from datetime import datetime +from dotenv import load_dotenv + +def get_s3_client(): + """ + Initializes the S3 client using AWS credentials from environment variables. + Expects: + - AWS_ACCESS_KEY_ID + - AWS_SECRET_ACCESS_KEY + - AWS_DEFAULT_REGION + Exits if any of these are missing. + """ + # load environment variables from .env file (override local config if exists) + load_dotenv(override=True) + + access_key = os.environ.get('AWS_ACCESS_KEY_ID') + secret_key = os.environ.get('AWS_SECRET_ACCESS_KEY') + region = os.environ.get('AWS_DEFAULT_REGION') + + if not access_key or not secret_key or not region: + sys.exit( + "Error: AWS credentials not set. Please set AWS_ACCESS_KEY_ID, " + "AWS_SECRET_ACCESS_KEY, and AWS_DEFAULT_REGION environment variables (in .env)." + ) + + return boto3.client( + 's3', + aws_access_key_id=access_key, + aws_secret_access_key=secret_key, + region_name=region + ) + +def check_s3_connection(s3_client, bucket): + """ + Verifies that we can connect to S3 and access the specified bucket. + """ + try: + s3_client.head_bucket(Bucket=bucket) + print(f"Connection check: Successfully accessed bucket '{bucket}'") + except Exception as e: + sys.exit(f"Error: Could not connect to S3 bucket '{bucket}': {e}") + +def push_folder_to_s3(folder_path, bucket, s3_prefix): + """ + Walks through the folder and uploads each file to S3 under the given prefix. + Files are stored under the key: s3_prefix/folder_name/. + """ + s3_client = get_s3_client() + check_s3_connection(s3_client, bucket) + + folder_name = os.path.basename(folder_path) + files_uploaded = 0 + + for root, _, files in os.walk(folder_path): + for file in files: + local_path = os.path.join(root, file) + # determine the file's path relative to the folder being pushed. + relative_path = os.path.relpath(local_path, folder_path) + s3_key = os.path.join(s3_prefix, folder_name, relative_path) + try: + print(f"Uploading {local_path} to s3://{bucket}/{s3_key}") + s3_client.upload_file(local_path, bucket, s3_key) + files_uploaded += 1 + except Exception as e: + print(f"Error uploading {local_path}: {e}") + + if files_uploaded == 0: + sys.exit(f"Error: No files were uploaded from folder: {folder_path}") + +def pull_folder_from_s3(bucket, s3_prefix, folder_name, dest_dir): + """ + Downloads all objects from S3 whose key begins with s3_prefix/folder_name. + The folder will be recreated under `dest_dir/folder_name`. + """ + s3_client = get_s3_client() + check_s3_connection(s3_client, bucket) + + prefix = os.path.join(s3_prefix, folder_name) + + paginator = s3_client.get_paginator('list_objects_v2') + pages = paginator.paginate(Bucket=bucket, Prefix=prefix) + + found_files = False + for page in pages: + if 'Contents' not in page: + continue + for obj in page['Contents']: + key = obj['Key'] + if key.endswith("/"): + continue + + found_files = True + relative_path = os.path.relpath(key, prefix) + local_path = os.path.join(dest_dir, folder_name, relative_path) + + os.makedirs(os.path.dirname(local_path), exist_ok=True) + + print(f"Downloading s3://{bucket}/{key} to {local_path}") + try: + s3_client.download_file(bucket, key, local_path) + except Exception as e: + print(f"Error downloading s3://{bucket}/{key}: {e}") + + if not found_files: + sys.exit(f"Error: No files found in bucket '{bucket}' with prefix '{prefix}'") + +def get_latest_local_folder(base_dir): + """ + Scans the base directory for subdirectories and returns the one with the latest timestamp. + Assumes folder names are in the format "YYYY-MM-DD_HH-MM-SS". + """ + try: + subdirs = [d for d in os.listdir(base_dir) if os.path.isdir(os.path.join(base_dir, d))] + except FileNotFoundError: + sys.exit(f"Error: Base directory does not exist: {base_dir}") + + if not subdirs: + sys.exit(f"Error: No subdirectories found in base directory: {base_dir}") + + def parse_timestamp(folder): + try: + return datetime.strptime(folder, "%Y-%m-%d_%H-%M-%S") + except Exception: + return datetime.min + + latest = sorted(subdirs, key=parse_timestamp)[-1] + return latest + +def get_latest_s3_folder(s3_client, bucket, s3_prefix): + """ + Retrieves the latest folder name from S3 under the given prefix. + It lists common prefixes (folders) and returns the one with the latest timestamp. + Assumes folder names follow the format "YYYY-MM-DD_HH-MM-SS". + """ + response = s3_client.list_objects_v2( + Bucket=bucket, + Prefix=s3_prefix + "/", + Delimiter="/" + ) + if 'CommonPrefixes' not in response: + sys.exit(f"Error: No folders found in S3 bucket '{bucket}' with prefix '{s3_prefix}'") + + folders = [] + for cp in response['CommonPrefixes']: + prefix = cp.get('Prefix') + # Expect prefix like "captures/2025-02-12_15-30-00/" + parts = prefix.split('/') + if len(parts) >= 2: + folder_name = parts[-2] + folders.append(folder_name) + + if not folders: + sys.exit(f"Error: No valid folder names found in S3 bucket '{bucket}' with prefix '{s3_prefix}'") + + def parse_timestamp(folder): + try: + return datetime.strptime(folder, "%Y-%m-%d_%H-%M-%S") + except Exception: + return datetime.min + + latest = sorted(folders, key=parse_timestamp)[-1] + return latest + +def main(): + parser = argparse.ArgumentParser( + description="Push or pull the latest folder to/from an S3 bucket." + ) + parser.add_argument("--action", choices=["push", "pull"], required=True, + help="Choose whether to push (upload) or pull (download) a folder.") + # The --folder argument is now optional. If not provided, the latest folder is auto-detected. + parser.add_argument("--folder", default=None, + help="(Optional) Folder name (e.g. 2025-02-12_15-30-00). If omitted, the latest folder is selected.") + parser.add_argument("--base-dir", default="logs", + help="Local base directory where capture runs are stored (used for push).") + parser.add_argument("--dest-dir", default="download", + help="Local directory to place the downloaded folder (used for pull).") + parser.add_argument("--bucket", required=True, + help="S3 bucket name.") + parser.add_argument("--s3-prefix", default="captures", + help="S3 prefix (folder) where data is stored or will be uploaded.") + args = parser.parse_args() + + if args.action == "push": + # If no folder is provided, determine the latest local folder from the base directory. + if args.folder is None: + args.folder = get_latest_local_folder(args.base_dir) + print(f"Auto-detected latest local folder: {args.folder}") + folder_path = os.path.join(args.base_dir, args.folder) + if not os.path.exists(folder_path): + sys.exit(f"Error: Folder does not exist: {folder_path}") + + # check if the folder is empty. + folder_empty = True + for _, _, files in os.walk(folder_path): + if files: + folder_empty = False + break + if folder_empty: + sys.exit(f"Error: Folder is empty: {folder_path}") + + push_folder_to_s3(folder_path, args.bucket, args.s3_prefix) + + elif args.action == "pull": + # If no folder is provided, query S3 for the latest folder under the specified prefix. + if args.folder is None: + s3_client = get_s3_client() + check_s3_connection(s3_client, args.bucket) + args.folder = get_latest_s3_folder(s3_client, args.bucket, args.s3_prefix) + print(f"Auto-detected latest folder on S3: {args.folder}") + pull_folder_from_s3(args.bucket, args.s3_prefix, args.folder, args.dest_dir) + +if __name__ == '__main__': + main() diff --git a/GEMstack/onboard/planning/capture_lidar_camera_data.py b/GEMstack/onboard/planning/capture_lidar_camera_data.py new file mode 100644 index 000000000..fbb6c1de5 --- /dev/null +++ b/GEMstack/onboard/planning/capture_lidar_camera_data.py @@ -0,0 +1,141 @@ +from ...state import AllState, VehicleState, ObjectPose, ObjectFrameEnum, AgentState, AgentEnum, AgentActivityEnum, MissionEnum +from ..interface.gem import GEMInterface +from ..component import Component +import cv2 +from typing import Dict +import numpy as np +import rospy +from sensor_msgs.msg import PointCloud2, Image +import sensor_msgs.point_cloud2 as pc2 +import struct, ctypes +from message_filters import Subscriber, ApproximateTimeSynchronizer +from cv_bridge import CvBridge +import time +from ...offboard.log_management.s3 import push_folder_to_s3 +import math +import pickle +from datetime import datetime +import os + + +def pc2_to_numpy(pc2_msg, want_rgb=False): + """ + Convert a ROS PointCloud2 message into a numpy array with basic filtering. + + This function extracts the x, y, z coordinates from the point cloud and + filters out points with x <= 0 and z >= 2.5. + + Args: + pc2_msg: The ROS PointCloud2 message. + want_rgb (bool): Flag to indicate if RGB data is desired (not used here). + + Returns: + np.ndarray: Filtered point cloud array of shape (N, 3). + """ + gen = pc2.read_points(pc2_msg, skip_nans=True) + pts = np.array(list(gen), dtype=np.float32) + pts = pts[:, :3] # Only x, y, z coordinates + mask = (pts[:, 0] > 0) & (pts[:, 2] < 2.5) + return pts[mask] + + +def rad_to_deg(rad): + """Convert radians to degrees""" + return rad * 180.0 / math.pi + + +class SaveInspectionData(Component): + """The component helps in saving lidar, camera and gnss data when the vehicle is in inspection mode.""" + def __init__(self, vehicle_interface: GEMInterface): + self.vehicle_interface = vehicle_interface + self.current_agents = {} + self.tracked_agents = {} + self.pedestrian_counter = 0 + + self.latest_fr_image = None + self.latest_rr_image = None + self.latest_lidar = None + self.latitude = None + self.longitude = None + self.altitude = None + self.bridge = CvBridge() + + self.index = 0 + timestamp = datetime.now().strftime('%Y-%m-%d_%H-%M-%S') + self.folder_path = f'./inspection_data_{timestamp}' + os.makedirs(self.folder_path, exist_ok=True) + + def rate(self) -> float: + return 8.0 + + def state_inputs(self) -> list: + return ['all'] + + def state_outputs(self) -> list: + return [] + + def initialize(self): + # Instead of individual subscriptions, use message_filters to synchronize + self.fr_image = Subscriber('/camera_fr/arena_camera_node/image_raw', Image) + self.rr_image = Subscriber('/camera_rr/arena_camera_node/image_raw', Image) + self.lidar_sub = Subscriber('/ouster/points', PointCloud2) + self.gnss_sub = Subscriber('/septentrio_gnss/insnavgeod', PointCloud2) + + self.sync = ApproximateTimeSynchronizer([self.fr_image, + self.rr_image, + self.lidar_sub, + self.gnss_sub], + queue_size=10, slop=0.1) + self.sync.registerCallback(self.synchronized_callback) + + def synchronized_callback(self, fr_image_msg, rr_image_msg, lidar_msg, gnss_msg): + """ + This callback is triggered when both an image and a LiDAR message arrive within the slop. + It stores the latest synchronized sensor data for processing in update(). + """ + # Convert the image message to an OpenCV image (assuming it is already in cv2.Mat format or convert as needed) + try: + # Convert the ROS Image message to an OpenCV image (BGR format) + self.latest_fr_image = self.bridge.imgmsg_to_cv2(fr_image_msg, "bgr8") + self.latest_rr_image = self.bridge.imgmsg_to_cv2(rr_image_msg, "bgr8") + except Exception as e: + rospy.logerr("Failed to convert image: {}".format(e)) + self.latest_fr_image = None + self.latest_rr_image = None + # Convert the LiDAR message to a numpy array + self.latest_lidar = pc2_to_numpy(lidar_msg, want_rgb=False) + + # Get gnss data + self.latitude = rad_to_deg(gnss_msg.latitude) + self.longitude = rad_to_deg(gnss_msg.longitude) + self.altitude = gnss_msg.height + self.yaw = gnss_msg.heading + self.pitch = gnss_msg.pitch + self.roll = gnss_msg.roll + + def update(self, state) -> Dict[str, AgentState]: + # Process only if synchronized sensor data is available + if self.latest_fr_image is None and self.latest_rr_image is None or self.latest_lidar is None: + return + + if state.mission.type == MissionEnum.INSPECT: + lidar_pc = self.latest_lidar.copy() + camera_fr = self.latest_fr_image.copy() + camera_rr = self.latest_rr_image.copy() + cv2.imwrite(os.path.join(self.folder_path, f'fr_{self.index}.png'), camera_fr) + cv2.imwrite(os.path.join(self.folder_path, f'rr_{self.index}.png'), camera_rr) + + # Save GNSS data + with open(os.path.join(self.folder_path, 'gnss.txt'), 'a') as fh: + fh.write(f'{self.latitude},{self.longitude},{self.altitude},{self.yaw}, {self.roll}, {self.pitch}\n') + + # Save LIDAR point cloud + with open(os.path.join(self.folder_path, f'pc_{self.index}.b'), 'wb') as fh: + pickle.dump(lidar_pc, fh) + + self.index += 1 + + elif state.mission.type == MissionEnum.INSPECT_UPLOAD: + # upload lidar and camera to s3 + # Currently this is a sync function. Can be converted into async to avoid blocking call + push_folder_to_s3(self.folder_path, "cs588", "inspect_results") diff --git a/GEMstack/onboard/planning/longitudinal_planning.py b/GEMstack/onboard/planning/longitudinal_planning.py new file mode 100644 index 000000000..65ad6c00a --- /dev/null +++ b/GEMstack/onboard/planning/longitudinal_planning.py @@ -0,0 +1,1343 @@ +from typing import List, Tuple, Union +from ..component import Component +from ...state import ( + AllState, + VehicleState, + EntityRelation, + EntityRelationEnum, + Path, + Trajectory, + Route, + ObjectFrameEnum, + AgentState, + MissionEnum, +) +from ...utils import serialization +from ...mathutils.transforms import vector_madd +from ...mathutils.quadratic_equation import quad_root + + +import time +import numpy as np +import matplotlib.pyplot as plt +import matplotlib.patches as patches +import math +from scipy.optimize import minimize + + +# Global variables +PEDESTRIAN_LENGTH = 0.5 +PEDESTRIAN_WIDTH = 0.5 + +VEHICLE_LENGTH = 3.5 +VEHICLE_WIDTH = 2 + +VEHICLE_BUFFER_X = 3.0 +VEHICLE_BUFFER_Y = 1.5 + +YIELD_BUFFER_Y = 1.0 +V_MAX = 5 +COMFORT_DECELERATION = 1.5 + + +def detect_collision( + curr_x: float, + curr_y: float, + curr_v: float, + obj: AgentState, + min_deceleration: float, + max_deceleration: float, + acceleration: float, + max_speed: float, +) -> Tuple[bool, Union[float, List[float]]]: + """Detects if a collision will occur with the given object and return deceleration to avoid it.""" + + # Get the object's position and velocity + obj_x = obj.pose.x + obj_y = obj.pose.y + obj_v_x = obj.velocity[0] + obj_v_y = obj.velocity[1] + + if obj.pose.frame == ObjectFrameEnum.CURRENT: + # Simulation: Current + obj_x = obj.pose.x + curr_x + obj_y = obj.pose.y + curr_y + print("PEDESTRIAN", obj_x, obj_y) + + vehicle_front = curr_x + VEHICLE_LENGTH + vehicle_back = curr_x + vehicle_left = curr_y + VEHICLE_WIDTH / 2 + vehicle_right = curr_y - VEHICLE_WIDTH / 2 + + pedestrian_front = obj_x + PEDESTRIAN_LENGTH / 2 + pedestrian_back = obj_x - PEDESTRIAN_LENGTH / 2 + pedestrian_left = obj_y + PEDESTRIAN_WIDTH / 2 + pedestrian_right = obj_y - PEDESTRIAN_WIDTH / 2 + + # Check if the object is in front of the vehicle + if vehicle_front > pedestrian_back: + if vehicle_back > pedestrian_front: + # The object is behind the vehicle + print("Object is behind the vehicle") + return False, 0.0 + if ( + vehicle_right - VEHICLE_BUFFER_Y > pedestrian_left + or vehicle_left + VEHICLE_BUFFER_Y < pedestrian_right + ): + # The object is to the side of the vehicle + print("Object is to the side of the vehicle") + return False, 0.0 + # The object overlaps with the vehicle's buffer + return True, max_deceleration + + if vehicle_right - VEHICLE_BUFFER_Y > pedestrian_left and obj_v_y <= 0: + # The object is to the right of the vehicle and moving away + print("Object is to the right of the vehicle and moving away") + return False, 0.0 + + if vehicle_left + VEHICLE_BUFFER_Y < pedestrian_right and obj_v_y >= 0: + # The object is to the left of the vehicle and moving away + print("Object is to the left of the vehicle and moving away") + return False, 0.0 + + if vehicle_front + VEHICLE_BUFFER_X >= pedestrian_back and ( + vehicle_right - VEHICLE_BUFFER_Y <= pedestrian_left + and vehicle_left + VEHICLE_BUFFER_Y >= pedestrian_right + ): + # The object is in front of the vehicle and within the buffer + print("Object is in front of the vehicle and within the buffer") + return True, max_deceleration + + # Calculate the deceleration needed to avoid a collision + print("Object is in front of the vehicle and outside the buffer") + distance = pedestrian_back - vehicle_front + distance_with_buffer = pedestrian_back - vehicle_front - VEHICLE_BUFFER_X + + relative_v = curr_v - obj_v_x + if relative_v <= 0: + return False, 0.0 + + if obj_v_y == 0: + # The object is in front of the vehicle blocking it + deceleration = relative_v**2 / (2 * distance_with_buffer) + if deceleration > max_deceleration: + return True, max_deceleration + if deceleration < min_deceleration: + return False, 0.0 + + return True, deceleration + + print(relative_v, distance_with_buffer) + + if obj_v_y > 0: + # The object is to the right of the vehicle and moving towards it + time_to_get_close = ( + vehicle_right - VEHICLE_BUFFER_Y - YIELD_BUFFER_Y - pedestrian_left + ) / abs(obj_v_y) + time_to_pass = ( + vehicle_left + VEHICLE_BUFFER_Y + YIELD_BUFFER_Y - pedestrian_right + ) / abs(obj_v_y) + else: + # The object is to the left of the vehicle and moving towards it + time_to_get_close = ( + pedestrian_right - vehicle_left - VEHICLE_BUFFER_Y - YIELD_BUFFER_Y + ) / abs(obj_v_y) + time_to_pass = ( + pedestrian_left - vehicle_right + VEHICLE_BUFFER_Y + YIELD_BUFFER_Y + ) / abs(obj_v_y) + + time_to_accel_to_max_speed = (max_speed - curr_v) / acceleration + distance_to_accel_to_max_speed = ( + (max_speed + curr_v - 2 * obj_v_x) * time_to_accel_to_max_speed / 2 + ) # area of trapezoid + + if distance_to_accel_to_max_speed > distance_with_buffer: + # The object will reach the buffer before reaching max speed + time_to_buffer_when_accel = ( + -relative_v + + (relative_v * relative_v + 2 * distance_with_buffer * acceleration) ** 0.5 + ) / acceleration + else: + # The object will reach the buffer after reaching max speed + time_to_buffer_when_accel = time_to_accel_to_max_speed + ( + distance_with_buffer - distance_to_accel_to_max_speed + ) / (max_speed - obj_v_x) + + if distance_to_accel_to_max_speed > distance: + # We will collide before reaching max speed + time_to_collide_when_accel = ( + -relative_v + (relative_v * relative_v + 2 * distance * acceleration) ** 0.5 + ) / acceleration + else: + # We will collide after reaching max speed + time_to_collide_when_accel = time_to_accel_to_max_speed + ( + distance - distance_to_accel_to_max_speed + ) / (max_speed - obj_v_x) + + if time_to_get_close > time_to_collide_when_accel: + # We can do normal driving and will pass the object before it gets in our way + print( + "We can do normal driving and will pass the object before it gets in our way" + ) + return False, 0.0 + + if vehicle_front + VEHICLE_BUFFER_X >= pedestrian_back: + # We cannot move pass the pedestrian before it reaches the buffer from side + return True, max_deceleration + + if time_to_pass < time_to_buffer_when_accel: + # The object will pass through our front before we drive normally and reach it + print( + "The object will pass through our front before we drive normally and reach it" + ) + return False, 0.0 + + distance_to_move = distance_with_buffer + time_to_pass * obj_v_x + + if curr_v**2 / (2 * distance_to_move) >= COMFORT_DECELERATION: + return True, curr_v**2 / (2 * distance_to_move) + + print("Calculating cruising speed") + return True, [distance_to_move, time_to_pass] + + +def detect_collision_analytical( + r_pedestrain_x: float, + r_pedestrain_y: float, + p_vehicle_left_y_after_t: float, + p_vehicle_right_y_after_t: float, + lateral_buffer: float, +) -> Union[bool, str]: + """Detects if a collision will occur with the given object and return deceleration to avoid it. Analytical""" + if r_pedestrain_x < 0 and abs(r_pedestrain_y) > lateral_buffer: + return False + elif r_pedestrain_x < 0: + return "max" + if ( + r_pedestrain_y >= p_vehicle_left_y_after_t + and r_pedestrain_y <= p_vehicle_right_y_after_t + ): + return True + + return False + + +def get_minimum_deceleration_for_collision_avoidance( + curr_x: float, + curr_y: float, + curr_v: float, + obj: AgentState, + min_deceleration: float, + max_deceleration: float, +) -> Tuple[bool, float]: + """Detects if a collision will occur with the given object and return deceleration to avoid it. Via Optimization""" + + # Get the object's position and velocity + obj_x = obj.pose.x + obj_y = obj.pose.y + obj_v_x = obj.velocity[0] + obj_v_y = obj.velocity[1] + + if obj.pose.frame == ObjectFrameEnum.CURRENT: + obj_x = obj.pose.x + curr_x + obj_y = obj.pose.y + curr_y + + obj_x = obj_x - curr_x + obj_y = obj_y - curr_y + + curr_x = curr_x - curr_x + curr_y = curr_y - curr_y + + vehicle_front = curr_x + VEHICLE_LENGTH + VEHICLE_BUFFER_X + vehicle_back = curr_x + vehicle_left = curr_y - VEHICLE_WIDTH / 2 + vehicle_right = curr_y + VEHICLE_WIDTH / 2 + + r_vehicle_front = vehicle_front - vehicle_front + r_vehicle_back = vehicle_back - vehicle_front + r_vehicle_left = vehicle_left - VEHICLE_BUFFER_Y + r_vehicle_right = vehicle_right + VEHICLE_BUFFER_Y + r_vehicle_v_x = curr_v + r_vehicle_v_y = 0 + + r_pedestrain_x = obj_x - vehicle_front + r_pedestrain_y = -obj_y + r_pedestrain_v_x = obj_v_x + r_pedestrain_v_y = -obj_v_y + + r_velocity_x_from_vehicle = r_vehicle_v_x - r_pedestrain_v_x + r_velocity_y_from_vehicle = r_vehicle_v_y - r_pedestrain_v_y + + t_to_r_pedestrain_x = (r_pedestrain_x - r_vehicle_front) / r_velocity_x_from_vehicle + + p_vehicle_left_y_after_t = ( + r_vehicle_left + r_velocity_y_from_vehicle * t_to_r_pedestrain_x + ) + p_vehicle_right_y_after_t = ( + r_vehicle_right + r_velocity_y_from_vehicle * t_to_r_pedestrain_x + ) + + collision_flag = detect_collision_analytical( + r_pedestrain_x, + r_pedestrain_y, + p_vehicle_left_y_after_t, + p_vehicle_right_y_after_t, + VEHICLE_BUFFER_Y, + ) + if collision_flag == False: + print( + "No collision", + curr_x, + curr_y, + r_pedestrain_x, + r_pedestrain_y, + r_vehicle_left, + r_vehicle_right, + p_vehicle_left_y_after_t, + p_vehicle_right_y_after_t, + ) + return 0.0, r_pedestrain_x + elif collision_flag == "max": + return max_deceleration, r_pedestrain_x + + print( + "Collision", + curr_x, + curr_y, + r_pedestrain_x, + r_pedestrain_y, + r_vehicle_left, + r_vehicle_right, + p_vehicle_left_y_after_t, + p_vehicle_right_y_after_t, + ) + + minimum_deceleration = None + if abs(r_velocity_y_from_vehicle) > 0.1: + if r_velocity_y_from_vehicle > 0.1: + # Vehicle Left would be used to yield + r_pedestrain_y_temp = r_pedestrain_y + abs(r_vehicle_left) + elif r_velocity_y_from_vehicle < -0.1: + # Vehicle Right would be used to yield + r_pedestrain_y_temp = r_pedestrain_y - abs(r_vehicle_right) + + softest_accleration = ( + 2 + * r_velocity_y_from_vehicle + * ( + r_velocity_y_from_vehicle * r_pedestrain_x + - r_velocity_x_from_vehicle * r_pedestrain_y_temp + ) + / r_pedestrain_y_temp**2 + ) + peak_y = ( + -(r_velocity_x_from_vehicle * r_velocity_y_from_vehicle) + / softest_accleration + ) + # if the peak is within the position of the pedestrian, + # then it indicates the path had already collided with the pedestrian, + # and so the softest acceleration should be the one the peak of the path is the same as the pedestrain's x position + # and the vehicle should be stopped exactly before the pedestrain's x position + if abs(peak_y) > abs(r_pedestrain_y_temp): + minimum_deceleration = abs(softest_accleration) + # else: the vehicle should be stopped exactly before the pedestrain's x position the same case as the pedestrain barely move laterally + if minimum_deceleration is None: + minimum_deceleration = r_velocity_x_from_vehicle**2 / (2 * r_pedestrain_x) + + print("calculated minimum deceleration: ", minimum_deceleration) + + if minimum_deceleration < min_deceleration: + return 0.0, r_pedestrain_x + else: + return ( + max(min(minimum_deceleration, max_deceleration), min_deceleration), + r_pedestrain_x, + ) + + +################################################################################ +########## Longitudinal Planning ############################################### +################################################################################ + + +def longitudinal_plan( + path: Path, + acceleration: float, + deceleration: float, + max_speed: float, + current_speed: float, + method: str, +) -> Trajectory: + """Generates a longitudinal trajectory for a path with a + trapezoidal velocity profile. + + 1. accelerates from current speed toward max speed + 2. travel along max speed + 3. if at any point you can't brake before hitting the end of the path, + decelerate with accel = -deceleration until velocity goes to 0. + """ + + if method == "milestone": + return longitudinal_plan_milestone( + path, acceleration, deceleration, max_speed, current_speed + ) + elif method == "dt": + return longitudinal_plan_dt( + path, acceleration, deceleration, max_speed, current_speed + ) + elif method == "dx": + return longitudinal_plan_dx( + path, acceleration, deceleration, max_speed, current_speed + ) + else: + raise NotImplementedError( + "Invalid method, only milestone, dt, adn dx are implemented." + ) + + +def longitudinal_plan_milestone( + path: Path, + acceleration: float, + deceleration: float, + max_speed: float, + current_speed: float, +) -> Trajectory: + """Generates a longitudinal trajectory for a path with a + trapezoidal velocity profile. + + 1. accelerates from current speed toward max speed + 2. travel along max speed + 3. if at any point you can't brake before hitting the end of the path, + decelerate with accel = -deceleration until velocity goes to 0. + """ + # Extrapolation factor for the points + factor = 5.0 + new_points = [] + for idx, point in enumerate(path.points[:-1]): + next_point = path.points[idx + 1] + if point[0] == next_point[0]: + break + xarange = np.arange( + point[0], next_point[0], (next_point[0] - point[0]) / factor + ) + if point[1] == next_point[1]: + yarange = [point[1]] * len(xarange) + else: + yarange = np.arange( + point[1], next_point[1], (next_point[1] - point[1]) / factor + ) + print(yarange) + for x, y in zip(xarange, yarange): + new_points.append((x, y)) + new_points.append(path.points[-1]) + + print("new points", new_points) + path = Path(path.frame, new_points) + + path_normalized = path.arc_length_parameterize() + points = [p for p in path_normalized.points] + times = [t for t in path_normalized.times] + # ============================================= + + print("-----LONGITUDINAL PLAN-----") + # print("path length: ", path.length()) + length = path.length() + + # If the path is too short, just return the path for preventing sudden halt of simulation + if length < 0.05: + return Trajectory(path.frame, points, times) + + # Starting point + x0 = points[0][0] + cur_point = points[0] + cur_time = times[0] + cur_index = 0 + + new_points = [] + new_times = [] + velocities = [] # for graphing and debugging purposes + + while current_speed > 0 or cur_index == 0: + # we want to iterate through all the points and add them + # to the new points. However, we also want to add "critical points" + # where we reach top speed, begin decelerating, and stop + new_points.append(cur_point) + new_times.append(cur_time) + velocities.append(current_speed) + # print("=====================================") + # print("new points: ", new_points) + # print("current index: ", cur_index) + # print("current speed: ", current_speed) + # print("current position: ", cur_point) + + # Information we will need: + # Calculate how much time it would take to stop + # Calculate how much distance it would take to stop + min_delta_t_stop = current_speed / deceleration + min_delta_x_stop = ( + current_speed * min_delta_t_stop - 0.5 * deceleration * min_delta_t_stop**2 + ) + # print(min_delta_x_stop) + assert min_delta_x_stop >= 0 + + # Check if we are done + + # If we cannot stop before or stop exactly at the final position requested + if cur_point[0] + min_delta_x_stop >= points[-1][0]: + # put on the breaks + + # Calculate the next point in a special manner because of too-little time to stop + if cur_index == len(points) - 1: + # the next point in this instance would be when we stop + next_point = (cur_point[0] + min_delta_x_stop, 0) + else: + next_point = points[cur_index + 1] + + # keep breaking until the next milestone in path + if next_point[0] <= points[-1][0]: + # print("continuing to next point") + delta_t_to_next_x = compute_time_to_x( + cur_point[0], next_point[0], current_speed, -deceleration + ) + cur_time += delta_t_to_next_x + cur_point = next_point + current_speed -= deceleration * delta_t_to_next_x + cur_index += 1 + else: + # continue to the point in which we would stop (current_velocity = 0) + # update to the next point + delta_t_to_next_x = compute_time_to_x( + cur_point[0], next_point[0], current_speed, -deceleration + ) + cur_point = next_point + cur_time += delta_t_to_next_x + # current_speed would not be exactly zero error would be less than 1e-4 but perfer to just set to zero + # current_speed -= delta_t_to_next_x*deceleration + current_speed = 0 + assert current_speed == 0 + + # This is the case where we are accelerating to max speed + # because the first if-statement covers for when we decelerating, + # the only time current_speed < max_speed is when we are accelerating + elif current_speed < max_speed: + # print("In case two") + # next point + next_point = points[cur_index + 1] + # accelerate to max speed + + # calculate the time it would take to reach max speed + delta_t_to_max_speed = (max_speed - current_speed) / acceleration + # calculate the distance it would take to reach max speed + delta_x_to_max_speed = ( + current_speed * delta_t_to_max_speed + + 0.5 * acceleration * delta_t_to_max_speed**2 + ) + + delta_t_to_stop_from_max_speed = max_speed / deceleration + delta_x_to_stop_from_max_speed = ( + max_speed * delta_t_to_stop_from_max_speed + - 0.5 * deceleration * delta_t_to_stop_from_max_speed**2 + ) + + delta_t_to_next_point = compute_time_to_x( + cur_point[0], next_point[0], current_speed, acceleration + ) + velocity_at_next_point = ( + current_speed + delta_t_to_next_point * acceleration + ) + time_to_stop_from_next_point = velocity_at_next_point / deceleration + delta_x_to_stop_from_next_point = ( + velocity_at_next_point * time_to_stop_from_next_point + - 0.5 * deceleration * time_to_stop_from_next_point**2 + ) + # if we would reach max speed after the next point, + # just move to the next point and update the current speed and time + if ( + next_point[0] + delta_x_to_stop_from_next_point < points[-1][0] + and cur_point[0] + delta_x_to_max_speed >= next_point[0] + ): + # ("go to next point") + # accelerate to max speed + delta_t_to_next_x = compute_time_to_x( + cur_point[0], next_point[0], current_speed, acceleration + ) + cur_time += delta_t_to_next_x + cur_point = [next_point[0], 0] + current_speed += delta_t_to_next_x * acceleration + cur_index += 1 + + # This is the case where we would need to start breaking before reaching + # top speed and before the next point (i.e. triangle shape velocity) + elif ( + cur_point[0] + delta_x_to_max_speed + delta_x_to_stop_from_max_speed + >= points[-1][0] + ): + # print(delta_x_to_max_speed) + # print(delta_x_to_stop_from_max_speed) + # Add a new point at the point where we should start breaking + # print("Adding new point to start breaking") + delta_t_to_next_x = compute_time_triangle( + cur_point[0], + points[-1][0], + current_speed, + 0, + acceleration, + deceleration, + ) + # print(delta_t_to_next_x) + # delta_t_to_next_x = compute_time_to_x(cur_point[0], points[-1][0] - min_delta_x_stop, current_speed, acceleration) + next_x = ( + cur_point[0] + + current_speed * delta_t_to_next_x + + 0.5 * acceleration * delta_t_to_next_x**2 + ) + cur_time += delta_t_to_next_x + cur_point = [next_x, 0] + current_speed += delta_t_to_next_x * acceleration + + # this is the case where we would reach max speed before the next point + # we need to create a new point where we would reach max speed + else: + # print("adding new point") + # we would need to add a new point at max speed + cur_time += delta_t_to_max_speed + cur_point = [cur_point[0] + delta_x_to_max_speed, 0] + current_speed = max_speed + + # This is the case where we are at max speed + # special functionality is that this block must + # add a point where we would need to start declerating to reach + # the final point + elif current_speed == max_speed: + next_point = points[cur_index + 1] + # continue on with max speed + # print("In case three") + + # add point to start decelerating + if next_point[0] + min_delta_x_stop >= points[-1][0]: + # print("Adding new point to start decelerating") + cur_time += ( + points[-1][0] - min_delta_x_stop - cur_point[0] + ) / current_speed + cur_point = [points[-1][0] - min_delta_x_stop, 0] + current_speed = max_speed + else: + # Continue on to next point + # print("Continuing on to next point") + cur_time += (next_point[0] - cur_point[0]) / current_speed + cur_point = next_point + cur_index += 1 + + # This is an edge case and should only be reach + # if the initial speed is greater than the max speed + elif current_speed > max_speed: + # We need to hit the breaks + + next_point = points[cur_index + 1] + # print("In case four") + # slow down to max speed + delta_t_to_max_speed = (current_speed - max_speed) / deceleration + delta_x_to_max_speed = ( + current_speed * delta_t_to_max_speed + - 0.5 * deceleration * delta_t_to_max_speed**2 + ) + + # If we would reach the next point before slowing down to max speed + # keep going until we reach the next point + if cur_point[0] + delta_x_to_max_speed >= next_point[0]: + delta_t_to_next_x = compute_time_to_x( + cur_point[0], next_point[0], current_speed, -deceleration + ) + cur_time += delta_t_to_next_x + cur_point = [next_point[0], 0] + current_speed -= delta_t_to_next_x * deceleration + cur_index += 1 + else: + # We would reach max speed before the next point + # we need to add a new point at the point where we + # would reach max speed + cur_time += delta_t_to_max_speed + cur_point = [cur_point[0] + delta_x_to_max_speed, 0] + current_speed = max_speed + + else: + # not sure what falls here + raise ValueError("LONGITUDINAL PLAN ERROR: Not sure how we ended up here") + + new_points.append(cur_point) + new_times.append(cur_time) + velocities.append(current_speed) + + points = new_points + times = new_times + print("[PLAN] Computed points:", points) + print("[TIME] Computed time:", times) + print("[Velocities] Computed velocities:", velocities) + + # ============================================= + + trajectory = Trajectory(path.frame, points, times, velocities) + return trajectory + + +def compute_time_to_x(x0: float, x1: float, v: float, a: float) -> float: + """Computes the time to go from x0 to x1 with initial velocity v0 and final velocity v1 + with constant acceleration a. I am assuming that we will always have a solution by settings + discriminant equal to zero, i'm not sure if this is an issue.""" + + """Consider changing the system to use linear operators instead of explicitly calculating because of instances here""" + + t1 = (-v + max(0, (v**2 - 2 * a * (x0 - x1))) ** 0.5) / a + t2 = (-v - max(0, (v**2 - 2 * a * (x0 - x1))) ** 0.5) / a + + if math.isnan(t1): + t1 = 0 + if math.isnan(t2): + t2 = 0 + + valid_times = [n for n in [t1, t2] if n > 0] + if valid_times: + return min(valid_times) + else: + return 0.0 + + +def compute_time_triangle( + x0: float, xf: float, v0: float, vf: float, acceleration: float, deceleration: float +) -> float: + """ + Compute the time to go from current point assuming we are accelerating to the point at which + we would need to start breaking in order to reach the final point with velocity 0. + """ + roots = quad_root( + 0.5 * acceleration + + acceleration**2 / deceleration + - 0.5 * acceleration**2 / deceleration, + v0 + 2 * acceleration * v0 / deceleration - acceleration * v0 / deceleration, + x0 - xf + v0**2 / deceleration - 0.5 * v0**2 / deceleration, + ) + t1 = max(roots) + assert t1 > 0 + return t1 + + +def solve_for_v_peak( + v0: float, acceleration: float, deceleration: float, total_length: float +) -> float: + + if acceleration <= 0 or deceleration <= 0: + raise ValueError("Acceleration and deceleration cant be negative") + + # Formuala: (v_peak^2 - v0^2)/(2*a) + v_peak^2/(2*d) = total_length + numerator = deceleration * v0**2 + 2 * acceleration * deceleration * total_length + denominator = acceleration + deceleration + v_peak_sq = numerator / denominator + + if v_peak_sq < 0: + return 0.0 + + return math.sqrt(v_peak_sq) + + +def compute_dynamic_dt(acceleration, speed, k=0.01, a_min=0.5): + position_step = k * max(speed, 1.0) # Ensures position step is speed-dependent + return np.sqrt(2 * position_step / max(acceleration, a_min)) + + +def longitudinal_plan_dt( + path, + acceleration: float, + deceleration: float, + max_speed: float, + current_speed: float, +): + # 1 parametrizatiom. + path_norm = path.arc_length_parameterize(speed=1.0) + total_length = path.length() + + # ------------------- + # If the path is too short, just return the path for preventing sudden halt of simulation + if total_length < 0.05: + points = [p for p in path_norm.points] + times = [t for t in path_norm.times] + return Trajectory(path.frame, points, times) + # ------------------- + + # 2. Compute distances for d_accel,d_decel + if max_speed > current_speed: + d_accel = (max_speed**2 - current_speed**2) / (2 * acceleration) + else: + d_accel = 0.0 # Already at or above max_speed + + d_decel = (max_speed**2) / (2 * deceleration) + + # 3. trapezoidal or triangle? + if d_accel + d_decel <= total_length: + t_accel = ( + (max_speed - current_speed) / acceleration + if max_speed > current_speed + else 0.0 + ) + t_decel = max_speed / deceleration + d_cruise = total_length - d_accel - d_decel + t_cruise = d_cruise / max_speed if max_speed != 0 else 0.0 + t_final = t_accel + t_cruise + t_decel + profile_type = "trapezoidal" + else: + # Triangular profile: not enough distance to reach max_speed so we will calculate peak speed. + peak_speed = solve_for_v_peak( + current_speed, acceleration, deceleration, total_length + ) + # choose the min just in case + peak_speed = min(peak_speed, max_speed) + t_accel = ( + (peak_speed - current_speed) / acceleration + if peak_speed > current_speed + else 0.0 + ) + t_decel = peak_speed / deceleration + t_final = t_accel + t_decel + profile_type = "triangular" + + t = 0 + times = [] + s_vals = [] + velocities = [] # for graphing and debugging purposes + + num_time_steps = 0 + speed = current_speed + while t < t_final: + times.append(t) + velocities.append(speed) + if profile_type == "trapezoidal": + if t < t_accel: + # Acceleration phase. + s = current_speed * t + 0.5 * acceleration * t**2 + speed = current_speed + acceleration * t + elif t < t_accel + t_cruise: + # Cruise phase. + s = d_accel + max_speed * (t - t_accel) + else: + # Deceleration phase. + t_decel_phase = t - (t_accel + t_cruise) + s = total_length - 0.5 * deceleration * (t_decel - t_decel_phase) ** 2 + speed = speed - deceleration * (t_decel - t_decel_phase) + else: # Triangular profile. + if t < t_accel: + # Acceleration phase. + s = current_speed * t + 0.5 * acceleration * t**2 + speed = current_speed + acceleration * t + else: + t_decel_phase = t - t_accel + s_accel = current_speed * t_accel + 0.5 * acceleration * t_accel**2 + s = ( + s_accel + + peak_speed * t_decel_phase + - 0.5 * deceleration * t_decel_phase**2 + ) + speed = speed - deceleration * t_decel_phase + + s_vals.append(min(s, total_length)) + if s >= total_length: + break + + dt = compute_dynamic_dt( + acceleration if t < t_accel else deceleration, current_speed + ) + t = t + dt + + num_time_steps += 1 + + # Compute trajectory points + points = [path_norm.eval(s) for s in s_vals] + print("Number of time steps is --------------------", num_time_steps) + + # return Trajectory(path_norm.frame, points, times) + + # # Plot: update a single window + # import matplotlib.pyplot as plt + # plt.figure("Distance vs Time") + # plt.clf() # Clear the current figure + # plt.plot(times, s_vals) + # plt.xlabel("Time (s)") + # plt.ylabel("Distance (m)") + # plt.title("Distance vs Time") + # plt.draw() + # plt.pause(0.001) + + # 4. Create a time grid. + # dt = 0.1 # adjust based on computation + # times = np.arange(0, t_final + dt, dt) + # num_time_steps = 0 + + # # 5. Compute the distance s(t) for each time step. + # s_vals = [] + # for t in times: + # if profile_type == "trapezoidal": + # if t < t_accel: + # # Acceleration phase. + # s = current_speed * t + 0.5 * acceleration * t**2 + # elif t < t_accel + t_cruise: + # # Cruise phase. + # s = d_accel + max_speed * (t - t_accel) + # else: + # # Deceleration phase. + # t_decel_phase = t - (t_accel + t_cruise) + # # Compute the remaining distance using the deceleration equation. + # s = total_length - 0.5 * deceleration * (t_decel - t_decel_phase)**2 + # else: # Triangular profile. + # if t < t_accel: + # # Acceleration phase. + # s = current_speed * t + 0.5 * acceleration * t**2 + # else: + # t_decel_phase = t - t_accel + # s_accel = current_speed * t_accel + 0.5 * acceleration * t_accel**2 + # s = s_accel + peak_speed * t_decel_phase - 0.5 * deceleration * t_decel_phase**2 + # num_time_steps +=1 + + # # should not exceed total path length + # s_vals.append(min(s, total_length)) + # print("NUmber of time steps -----------",num_time_steps) + # print("T FInal ----------------------------", t_final) + # points = [path_norm.eval(s) for s in s_vals] + + trajectory = Trajectory(path_norm.frame, points, list(times), velocities=velocities) + return trajectory + + +def longitudinal_plan_dx( + path: Path, + acceleration: float, + deceleration: float, + max_speed: float, + current_speed: float, +) -> Trajectory: + """Generates a longitudinal trajectory for a path with a + trapezoidal velocity profile. + + 1. accelerates from current speed toward max speed + 2. travel along max speed + 3. if at any point you can't brake before hitting the end of the path, + decelerate with accel = -deceleration until velocity goes to 0. + """ + path_normalized = path.arc_length_parameterize() + points = [p for p in path_normalized.points] + times = [t for t in path_normalized.times] + + # ============================================= + # Adjust these two numbers to choose between computation speed or smoothness + rq = 0.1 # Smaller, smoother + multi = 5 # Larger, smoother + print("-----LONGITUDINAL PLAN-----") + print("path length: ", path.length()) + length = path.length() + + # If the path is too short, just return the path for preventing sudden halt of simulation + if length < 0.05: + return Trajectory(path.frame, points, times) + + # This assumes that the time denomination cannot be changed + + # Starting point + x0 = points[0][0] + cur_point = points[0] + cur_time = times[0] + cur_index = 0 + acc = 0 + + new_points = [] + new_times = [] + velocities = [] # for graphing and debugging purposes + + while current_speed > 0 or cur_index == 0: + # we want to iterate through all the points and add them + # to the new points. However, we also want to add "critical points" + # where we reach top speed, begin decelerating, and stop + new_points.append(cur_point) + new_times.append(cur_time) + velocities.append(current_speed) + print("=====================================") + print("new points: ", new_points) + print("current index: ", cur_index) + print("current speed: ", current_speed) + + # Information we will need: + # Calculate how much time it would take to stop + # Calculate how much distance it would take to stop + min_delta_t_stop = current_speed / deceleration + min_delta_x_stop = ( + current_speed * min_delta_t_stop - 0.5 * deceleration * min_delta_t_stop**2 + ) + assert min_delta_x_stop >= 0 + + # Check if we are done + + # If we cannot stop before or stop exactly at the final position requested + if cur_point[0] + min_delta_x_stop >= points[-1][0] - 0.0001: + acc = deceleration + flag = 1 + print("In case one") + # put on the breaks + # Calculate the next point in a special manner because of too-little time to stop + if cur_index >= len(points) - 1: + # the next point in this instance would be when we stop + print(1) + if min_delta_x_stop < rq * acc: + next_point = (cur_point[0] + min_delta_x_stop, 0) + else: + next_point = (cur_point[0] + (min_delta_x_stop / (acc * multi)), 0) + flag = 0 + else: + print(2) + next_point = points[cur_index + 1] + if next_point[0] - cur_point[0] > rq * acc: + tmp = cur_point[0] + (next_point[0] - cur_point[0]) / (acc * multi) + flag = 0 + next_point = [tmp, next_point[1]] + + # keep breaking until the next milestone in path + print("continuing to next point") + delta_t_to_next_x = compute_time_to_x( + cur_point[0], next_point[0], current_speed, -deceleration + ) + cur_time += delta_t_to_next_x + cur_point = next_point + current_speed -= deceleration * delta_t_to_next_x + if flag: + cur_index += 1 + + # This is the case where we are accelerating to max speed + # because the first if-statement covers for when we decelerating, + # the only time current_speed < max_speed is when we are accelerating + elif current_speed < max_speed: + print("In case two") + print(current_speed) + acc = acceleration + flag = 1 + # next point + next_point = points[cur_index + 1] + if next_point[0] - cur_point[0] > rq * acc: + tmp = cur_point[0] + (next_point[0] - cur_point[0]) / (acc * multi) + flag = 0 + next_point = [tmp, next_point[1]] + # accelerate to max speed + + # calculate the time it would take to reach max speed + delta_t_to_max_speed = (max_speed - current_speed) / acceleration + # calculate the distance it would take to reach max speed + delta_x_to_max_speed = ( + current_speed * delta_t_to_max_speed + + 0.5 * acceleration * delta_t_to_max_speed**2 + ) + + # if we would reach max speed after the next point, + # just move to the next point and update the current speed and time + if cur_point[0] + delta_x_to_max_speed >= next_point[0]: + print("go to next point") + # accelerate to max speed + delta_t_to_next_x = compute_time_to_x( + cur_point[0], next_point[0], current_speed, acceleration + ) + cur_time += delta_t_to_next_x + cur_point = [next_point[0], 0] + current_speed += delta_t_to_next_x * acceleration + if flag: + cur_index += 1 + + # this is the case where we would reach max speed before the next point + # we need to create a new point where we would reach max speed + else: + print("adding new point") + # we would need to add a new point at max speed + cur_time += delta_t_to_max_speed + cur_point = [cur_point[0] + delta_x_to_max_speed, 0] + current_speed = max_speed + + # This is the case where we are at max speed + # special functionality is that this block must + # add a point where we would need to start declerating to reach + # the final point + elif current_speed == max_speed: + next_point = points[cur_index + 1] + # continue on with max speed + print("In case three") + + # add point to start decelerating + if next_point[0] + min_delta_x_stop >= points[-1][0]: + print("Adding new point to start decelerating") + cur_time += ( + points[-1][0] - min_delta_x_stop - cur_point[0] + ) / current_speed + cur_point = [points[-1][0] - min_delta_x_stop, 0] + current_speed = max_speed + else: + # Continue on to next point + print("Continuing on to next point") + cur_time += (next_point[0] - cur_point[0]) / current_speed + cur_point = next_point + cur_index += 1 + + # This is an edge case and should only be reach + # if the initial speed is greater than the max speed + elif current_speed > max_speed: + # We need to hit the breaks + acc = deceleration + flag = 1 + # next point + next_point = points[cur_index + 1] + if next_point[0] - cur_point[0] > rq * acc: + tmp = cur_point[0] + (next_point[0] - cur_point[0]) / (acc * multi) + flag = 0 + next_point = [tmp, next_point[1]] + print("In case four") + # slow down to max speed + delta_t_to_max_speed = (current_speed - max_speed) / deceleration + delta_x_to_max_speed = ( + current_speed * delta_t_to_max_speed + - 0.5 * deceleration * delta_t_to_max_speed**2 + ) + + # If we would reach the next point before slowing down to max speed + # keep going until we reach the next point + if cur_point[0] + delta_x_to_max_speed >= next_point[0]: + delta_t_to_next_x = compute_time_to_x( + cur_point[0], next_point[0], current_speed, -deceleration + ) + cur_time += delta_t_to_next_x + cur_point = [next_point[0], 0] + current_speed -= delta_t_to_next_x * deceleration + cur_index += 1 + else: + # We would reach max speed before the next point + # we need to add a new point at the point where we + # would reach max speed + cur_time += delta_t_to_max_speed + cur_point = [cur_point[0] + delta_x_to_max_speed, 0] + current_speed = max_speed + + else: + # not sure what falls here + raise ValueError("LONGITUDINAL PLAN ERROR: Not sure how we ended up here") + + new_points.append(cur_point) + new_times.append(cur_time) + velocities.append(current_speed) + + points = new_points + times = new_times + print("[PLAN] Computed points:", points) + print("[TIME] Computed time:", times) + print("[Velocities] Computed velocities:", velocities) + + # ============================================= + + trajectory = Trajectory(path.frame, points, times) + return trajectory + + +def longitudinal_brake( + path: Path, deceleration: float, current_speed: float +) -> Trajectory: + """Generates a longitudinal trajectory for braking along a path.""" + path_normalized = path.arc_length_parameterize() + points = [p for p in path_normalized.points] + times = [t for t in path_normalized.times] + + # ============================================= + + print("=====LONGITUDINAL BRAKE=====") + print("path length: ", path.length()) + length = path.length() + + x0 = points[0][0] + t_stop = current_speed / deceleration + x_stop = x0 + current_speed * t_stop - 0.5 * deceleration * t_stop**2 + + new_points = [] + velocities = [] + + for t in times: + if t <= t_stop: + x = x0 + current_speed * t - 0.5 * deceleration * t**2 + else: + x = x_stop + new_points.append([x, 0]) + velocities.append(current_speed - deceleration * t) + points = new_points + print("[BRAKE] Computed points:", points) + + # ============================================= + + trajectory = Trajectory(path.frame, points, times, velocities) + return trajectory + + +################################################################################ +########## Yield Trajectory Planner ############################################ +################################################################################ + + +########################## +##### Patrick's Code ##### +########################## + + +class YieldTrajectoryPlanner(Component): + """Follows the given route. Brakes if you have to yield or + you are at the end of the route, otherwise accelerates to + the desired speed. + """ + + def __init__( + self, + mode: str = "real", + params: dict = {"planner": "dt", "desired_speed": 1.0, "acceleration": 0.5}, + ): + self.route_progress = None + self.t_last = None + self.acceleration = 1.0 + self.desired_speed = 1.0 + self.deceleration = 2.0 + + self.min_deceleration = 1.0 + self.max_deceleration = 8.0 + + self.mode = mode + self.planner = params["planner"] + self.mission = None + + def state_inputs(self): + return ["all"] + + def state_outputs(self) -> List[str]: + return ["trajectory"] + + def rate(self): + return 10.0 + + def update(self, state: AllState): + start_time = time.time() + if self.mission == None: + self.mission = state.mission.type + + vehicle = state.vehicle # type: VehicleState + route = state.route # type: Route + t = state.t + + if self.t_last is None: + self.t_last = t + dt = t - self.t_last + + # Position in vehicle frame (Start (0,0) to (15,0)) + curr_x = vehicle.pose.x + curr_y = vehicle.pose.y + curr_v = vehicle.v + + abs_x = curr_x + state.start_vehicle_pose.x + abs_y = curr_y + state.start_vehicle_pose.y + + ############################################### + # print("@@@@@ VEHICLE STATE @@@@@") + # print(vehicle) + # print("@@@@@@@@@@@@@@@@@@@@@@@@@") + + if self.mode == "real": + abs_x = curr_x + abs_y = curr_y + ############################################### + + if state.mission.type == MissionEnum.IDLE: + return Trajectory( + times=[0, 0], frame=ObjectFrameEnum.START, points=[[0, 0]] + ) + + # figure out where we are on the route + if state.mission.type!=self.mission: + self.route_progress = None + self.mission = state.mission.type + + if self.route_progress is None: + self.route_progress = 0.0 + closest_dist, closest_parameter = state.route.closest_point_local( + (curr_x, curr_y), [self.route_progress - 5.0, self.route_progress + 5.0] + ) + self.route_progress = closest_parameter + + # lookahead_distance = max(10, curr_v**2 / (2 * self.deceleration)) + # print("Lookahead distance:", lookahead_distance) + + route_to_end = route.trim(closest_parameter, len(route.points) - 1) + + should_yield = False + yield_deceleration = 0.0 + + # print("Current Speed: ", curr_v) + + for r in state.relations: + if r.type == EntityRelationEnum.YIELDING and r.obj1 == "": + # get the object we are yielding to + obj = state.agents[r.obj2] + + detected, deceleration = detect_collision( + abs_x, + abs_y, + curr_v, + obj, + self.min_deceleration, + self.max_deceleration, + self.acceleration, + self.desired_speed, + ) + if isinstance(deceleration, list): + print("@@@@@ INPUT", deceleration) + time_collision = deceleration[1] + distance_collision = deceleration[0] + b = 3 * time_collision - 2 * curr_v + c = curr_v**2 - 3 * distance_collision + desired_speed = (-b + (b**2 - 4 * c) ** 0.5) / 2 + deceleration = 1.5 + print("@@@@@ YIELDING", desired_speed) + route_yield = route.trim( + closest_parameter, closest_parameter + distance_collision + ) + traj = longitudinal_plan( + route_yield, + self.acceleration, + deceleration, + desired_speed, + curr_v, + self.planner, + ) + return traj + else: + if detected and deceleration > 0: + yield_deceleration = deceleration + should_yield = True + + print("should yield: ", should_yield) + + should_accelerate = not should_yield and curr_v < self.desired_speed + + # choose whether to accelerate, brake, or keep at current velocity + if should_accelerate: + traj = longitudinal_plan( + route_to_end, + self.acceleration, + self.deceleration, + self.desired_speed, + curr_v, + self.planner, + ) + elif should_yield: + traj = longitudinal_brake(route_to_end, yield_deceleration, curr_v) + else: + traj = longitudinal_plan( + route_to_end, + 0.0, + self.deceleration, + self.desired_speed, + curr_v, + self.planner, + ) + + return traj \ No newline at end of file diff --git a/GEMstack/onboard/planning/route_planning_component.py b/GEMstack/onboard/planning/route_planning_component.py new file mode 100644 index 000000000..1eeebe090 --- /dev/null +++ b/GEMstack/onboard/planning/route_planning_component.py @@ -0,0 +1,484 @@ +import os +from typing import Dict, List + +import numpy as np +from GEMstack.onboard.component import Component +from GEMstack.state.agent import AgentState +from GEMstack.state.mission import MissionEnum +from GEMstack.state.all import AllState +from GEMstack.state.mission_plan import MissionPlan +from GEMstack.state.physical_object import ObjectFrameEnum, ObjectPose +from GEMstack.state.route import PlannerEnum, Route +from GEMstack.state.vehicle import VehicleState +from GEMstack.state.intent import VehicleIntentEnum +from .planner import optimized_kinodynamic_rrt_planning +from .map_utils import load_pgm_to_occupancy_grid +from .rrt_star import RRTStar +from typing import List +from ..component import Component +from ...utils import serialization +from ...state import Route, ObjectFrameEnum +import math +import requests + +import rospy +from sensor_msgs.msg import Image +from cv_bridge import CvBridge +from .occupancy_grid2 import OccupancyGrid2 +import cv2 + + +# Constants for planning +ORIGIN_PX = (190, 80) +SCALE_PX_PER_M = 6.5 + + +# Functions to dynamically calculate a circular or linear path around the inspection area +def is_inside_geofence(x, y, xmin, xmax, ymin, ymax): + return xmin < x < xmax and ymin < y < ymax + + +def max_visible_arc(circle_center, radius, geofence): + """Circular arc calculation around the inspection area.""" + xc, yc = circle_center + (xmin, ymin), (xmax, ymax) = geofence + + angles = np.linspace(0, 2 * np.pi, 500, endpoint=False) + arc_segments = [] + curr_segment = [] + + first_inside = last_inside = False + flag_full_circle = True + tangent_min = 0 + min_index = 0 + + for i, theta in enumerate(angles): + x = xc + radius * np.cos(theta) + y = yc + radius * np.sin(theta) + + inside = is_inside_geofence(x, y, xmin, xmax, ymin, ymax) + + if i == 0: + first_inside = inside + if i == len(angles) - 1: + last_inside = inside + + if inside: + curr_segment.append((x, y)) + # Calculate the tangent heading in a clockwise direction + tangent_heading = -np.arctan2(y - yc, x - xc) # Clockwise heading + if abs(1 - tangent_heading) > abs(1 - tangent_min): + if np.arctan2(yc, xc) < np.arctan2(y, x): + tangent_min = tangent_heading + min_index = i + else: + flag_full_circle = False + if curr_segment: + arc_segments.append(curr_segment) + curr_segment = [] + + if curr_segment: + arc_segments.append(curr_segment) + + # If arc wraps around from 2π back to 0, combine first and last segments + if first_inside and last_inside and len(arc_segments) > 1: + arc_segments[0] = arc_segments[-1] + arc_segments[0] + arc_segments.pop() + + if not arc_segments: + return [] + + max_arc = list(max(arc_segments, key=len)) + + if flag_full_circle: + max_arc = max_arc[min_index:] + max_arc[:min_index] + + return max_arc[::-1] + + +def heading(cx, cy, px, py): + """Calculate the heading of the vehicle wrt to a fixed point""" + dx = px - cx + dy = py - cy + tx = -dy + ty = dx + return math.degrees(math.atan2(ty, tx)) # Heading in radians + + +def create_path_around_inspection(inspection_area, geofence, margin=1.0): + """Linear path around the inspection area""" + (ixmin, iymin), (ixmax, iymax) = inspection_area + (gxmin, gymin), (gxmax, gymax) = geofence + + # Define top path + top_y = iymax + margin + bottom_y = iymin - margin + + top_possible = gymin < top_y < gymax + bottom_possible = gymin < bottom_y < gymax + + top_path = ( + [(ixmin - 2, top_y), (ixmax + 2, top_y)] if top_possible else None + ) + right_path = ( + [(ixmax + margin, top_y), (ixmax + margin, bottom_y)] if top_possible else None + ) + bottom_path = ( + [(ixmax + margin, bottom_y), (ixmin - margin, bottom_y)] + if bottom_possible + else None + ) + left_path = ( + [(ixmin - margin, bottom_y), (ixmin - margin, top_y)] + if bottom_possible + else None + ) + + if top_possible: + full_path = top_path + elif bottom_possible: + full_path = bottom_path + else: + full_path = top_path[:len(top_path)/2] # only half part of the top + + return full_path + + +def check_point_exists(vehicle, start_pose, server_url="https://summon-app-production.up.railway.app"): + """Querying the frontend to get the inspection area.""" + print("Vehicle pose frame", vehicle.pose.frame) + try: + response = requests.get(f"{server_url}/api/inspect") + response.raise_for_status() + points = response.json().get("coords", []) + + if points: + pt1 = ObjectPose( + frame=ObjectFrameEnum.GLOBAL, + t=0, + x=points[0]["lon"], + y=points[0]["lat"], + ) + pt2 = ObjectPose( + frame=ObjectFrameEnum.GLOBAL, + t=0, + x=points[1]["lon"], + y=points[1]["lat"], + ) + pt1 = pt1.to_frame(ObjectFrameEnum.START, start_pose_abs=start_pose) + pt2 = pt2.to_frame(ObjectFrameEnum.START, start_pose_abs=start_pose) + return True, [[pt1.x, pt1.y], [pt2.x, pt2.y]] + return False, [] + + except requests.exceptions.RequestException as e: + print("Error contacting server:", e) + return False, [] + + +class InspectRoutePlanner(Component): + """Inspection route planner that controls the state transition logic for the vertical behavior of the vehicle + while inspection.""" + + def __init__(self, state_machine, frame: str = "start"): + self.geofence_area = [[-40, -40], [40, 40]] + self.state_list = state_machine + self.index = 0 + self.mission = self.state_list[self.index] + self.circle_center = [10,5] + self.radius = 2 + self.start = [0, 0] + self.bridge = CvBridge() + self.img_pub = rospy.Publisher("/image_with_car_xy", Image, queue_size=1) + self.occupancy_grid = OccupancyGrid2() + self.planned_path_already = False + self.x = None + + def state_inputs(self): + return ["all"] + + def state_outputs(self) -> List[str]: + return ["route", "mission"] + + def rate(self): + return 2.0 + + def _car_to_pixel(self, x, y, img_w, img_h): + # (x,y)[m] → (u,v)[px] + u0, v0 = ORIGIN_PX + u = int(round(u0 + SCALE_PX_PER_M * x)) + v = int(round(v0 - SCALE_PX_PER_M * y)) + + # clamp to image bounds + u = max(0, min(u, img_w - 1)) + v = max(0, min(v, img_h - 1)) + return u, v + + def _pixel_to_car(self, u, v, img_w, img_h): + # clamp to image bounds + u = max(0, min(u, img_w - 1)) + v = max(0, min(v, img_h - 1)) + + # origin and scale (same as in _car_to_pixel) + u0, v0 = ORIGIN_PX + scale = SCALE_PX_PER_M + + x = (u - u0) / scale + y = (v0 - v) / scale + + return x, y + + def visualize_route_pixels(self, route_pts, start_pt, goal_pt): + """ + route_pts: list of (u, v) in pixels + start_pt: (u, v) in pixels + goal_pt: (u, v) in pixels + """ + # 1. Copy the base image + script_dir = os.path.dirname(os.path.abspath(__file__)) + frame_path = os.path.join(script_dir, "out.pgm") + frame = cv2.imread(frame_path, cv2.IMREAD_COLOR) + img_h, img_w = frame.shape[:2] + + # 2. Optionally clamp all points to image bounds + def clamp(pt): + u, v = pt + u = max(0, min(int(round(u)), img_w - 1)) + v = max(0, min(int(round(v)), img_h - 1)) + return (u, v) + + pts = np.array([clamp(p) for p in route_pts], dtype=np.int32).reshape(-1, 1, 2) + + # 3. Draw the route polyline in green + cv2.polylines(frame, [pts], isClosed=False, color=(0, 255, 0), thickness=2) + + # 4. Draw start marker in blue (star) + u_s, v_s = clamp(start_pt) + cv2.drawMarker( + frame, + (u_s, v_s), + color=(255, 0, 0), + markerType=cv2.MARKER_STAR, + markerSize=20, + thickness=3, + ) + + # 5. Draw goal marker in red (tilted cross) + u_g, v_g = clamp(goal_pt) + cv2.drawMarker( + frame, + (u_g, v_g), + color=(0, 0, 255), + markerType=cv2.MARKER_TILTED_CROSS, + markerSize=20, + thickness=3, + ) + + # 6. Publish via ROS + out = self.bridge.cv2_to_imgmsg(frame, "bgr8") + # print("Publishing out: ", out) + out.header.stamp = rospy.Time.now() + self.img_pub.publish(out) + + def rrt_route(self, state, goal_start_pose): + ## Step. 1 Convert vehicle pose to global frame + vehicle_global_pose = state.vehicle.pose.to_frame( + ObjectFrameEnum.GLOBAL, start_pose_abs=state.start_vehicle_pose + ) + self.occupancy_grid.gnss_to_image( + vehicle_global_pose.x, vehicle_global_pose.y + ) + + ## Step 2: Get start image coordinates aka position of vehicle in image + start_x, start_y = self.occupancy_grid.gnss_to_image_coords( + vehicle_global_pose.x, vehicle_global_pose.y + ) + start_yaw = vehicle_global_pose.yaw + print("Start image coordinates", start_x, start_y, "yaw", start_yaw) + + ## Step 3. Convert goal to global frame + goal_global_pose = goal_start_pose.to_frame( + ObjectFrameEnum.GLOBAL, start_pose_abs=state.start_vehicle_pose + ) + goal_x, goal_y = self.occupancy_grid.gnss_to_image_coords( + goal_global_pose.x, goal_global_pose.y + ) + goal_yaw = goal_global_pose.yaw + print("Goal image coordinates", goal_x, goal_y, "yaw", goal_yaw) + + script_dir = os.path.dirname(os.path.abspath(__file__)) + map_path = os.path.join(script_dir, "highbay_image.pgm") + print("map_path", map_path) + + map_img = cv2.imread(map_path, cv2.IMREAD_UNCHANGED) + occupancy_grid = (map_img > 0).astype( + np.uint8 + ) + self.t_last = None + self.bounds = (0, occupancy_grid.shape[1]) + + script_dir = os.path.dirname(os.path.abspath(__file__)) + map_path = os.path.join(script_dir, "highbay_image.pgm") + + start_w = [start_y, start_x, start_yaw] + goal_w = [goal_y, goal_x, goal_yaw] + + path = optimized_kinodynamic_rrt_planning(start_w, goal_w, occupancy_grid) + + waypoints = [] + for i in range(len(path)): + x, y, theta = path[i] + # Convert to car coordinates + waypoint_lat, waypoint_lon = self.occupancy_grid.image_to_gnss( + y, x + ) + # Convert global to start frame + waypoint_global_pose = ObjectPose( + frame=ObjectFrameEnum.GLOBAL, + t=state.start_vehicle_pose.t, + x=waypoint_lon, + y=waypoint_lat, + yaw=theta, + ) + waypoint_start_pose = waypoint_global_pose.to_frame( + ObjectFrameEnum.START, start_pose_abs=state.start_vehicle_pose + ) + waypoints.append((waypoint_start_pose.x, waypoint_start_pose.y)) + waypoint_global_pose = ObjectPose( + frame=ObjectFrameEnum.GLOBAL, + t=state.start_vehicle_pose.t, + x=waypoint_lon, + y=waypoint_lat, + yaw=theta, + ) + waypoint_start_pose = waypoint_global_pose.to_frame( + ObjectFrameEnum.START, start_pose_abs=state.start_vehicle_pose + ) + waypoints.append((waypoint_start_pose.x, waypoint_start_pose.y)) + + return Route(frame=ObjectFrameEnum.START, points=waypoints) + + def update(self, state): + # Default route to ensure that the IDLE state does not run into error + self.route = Route(frame=ObjectFrameEnum.START, points=((0, 0, 0))) + + print("Mode ", state.mission.type) + print("Mission state:", self.mission) + + if self.mission == "IDLE": + state.mission.type = MissionEnum.IDLE + points_found, pts = check_point_exists( + state.vehicle, state.start_vehicle_pose + ) + if points_found: + self.inspection_area = pts + print("Inspection coordinates:", self.inspection_area) + print(self.state_list[self.index + 1]) + self.mission = self.state_list[self.index + 1] + self.index += 1 + print("CHANGING STATES", self.mission) + self.start = [state.vehicle.pose.x, state.vehicle.pose.y] + + ## Inspection through a circular arc + # self.circle_center = [ + # (self.inspection_area[0][0] + self.inspection_area[1][0]) / 2, + # (self.inspection_area[0][1] + self.inspection_area[1][1]) / 2, + # ] + # self.radius = ( + # (self.inspection_area[0][0] + self.inspection_area[1][0]) ** 2 + # + (self.inspection_area[0][1] + self.inspection_area[1][1]) ** 2 + # ) ** 0.5 / 2 + # self.inspection_route = max_visible_arc( + # self.circle_center, self.radius, self.geofence_area + # ) + + ## Inspection through a linear path + self.inspection_route = create_path_around_inspection( + self.inspection_area, self.geofence_area + ) + + elif self.mission == "NAV": + state.mission.type = MissionEnum.DRIVE + + start = (state.vehicle.pose.x, state.vehicle.pose.y) + goal = ObjectPose( + frame=ObjectFrameEnum.START, + t=state.start_vehicle_pose.t, + x=self.inspection_route[0][0], + y=self.inspection_route[0][1], + yaw=0, + ) + print("Current Position: ", start) + print("Goal Position: ", goal) + + if self.planned_path_already == False: + ## Ensure that RRT star does not run for every iteration of the component + self.x = self.rrt_route(state, goal) + self.planned_path_already = True + self.route = self.x + + ## GOAL condition + if (abs(state.vehicle.pose.x - goal.x) <= 3 and abs(state.vehicle.pose.y - goal.y) <= 3): + print(self.state_list[self.index + 1]) + self.mission = self.state_list[self.index + 1] + self.index += 1 + print("CHANGING STATES", self.mission) + self.planned_path_already = False + + elif self.mission == "INSPECT": + state.mission.type = MissionEnum.INSPECT + start = (state.vehicle.pose.x, state.vehicle.pose.y) + goal = (self.inspection_route[-1][0], self.inspection_route[-1][1]) + + self.route = Route( + frame=ObjectFrameEnum.START, points=self.inspection_route + ) + + ## GOAL condition + if (abs(state.vehicle.pose.x - goal[0]) <= 3 and abs(state.vehicle.pose.y - goal[1]) <= 3): + print(self.state_list[self.index + 1]) + self.mission = self.state_list[self.index + 1] + self.index += 1 + print("CHANGING STATES", self.mission) + + + ## Camera trigger logic + if ( + heading( + self.circle_center[0], + self.circle_center[1], + state.vehicle.pose.x, + state.vehicle.pose.y, + ) + * state.vehicle.pose.yaw + > 0 + ): + state.intent = VehicleIntentEnum.CAMERA_FR + else: + state.intent = VehicleIntentEnum.CAMERA_RR + + elif self.mission == "FINISH": + state.mission.type = MissionEnum.HOME + goal = ObjectPose( + frame=ObjectFrameEnum.START, + t=state.start_vehicle_pose.t, + x=0, + y=0, + yaw=0, + ) + print("Goal Position: ", goal) + + if self.planned_path_already == False: + ## Ensure that RRT star does not run for every iteration of the component + self.x = self.rrt_route(state, goal) + self.planned_path_already = True + self.route = self.x + + ## GOAL condition + if (abs(state.vehicle.pose.x - goal.x) <= 3 and abs(state.vehicle.pose.y - goal.y) <= 3): + print(self.state_list[self.index + 1]) + self.mission = self.state_list[self.index + 1] + self.index += 1 + print("CHANGING STATES", self.mission) + + print("-------------------------------------------------") + return [self.route, state.mission] diff --git a/GEMstack/scripts/__init__.py b/GEMstack/scripts/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/GEMstack/scripts/geotag.py b/GEMstack/scripts/geotag.py new file mode 100644 index 000000000..d9b03a213 --- /dev/null +++ b/GEMstack/scripts/geotag.py @@ -0,0 +1,167 @@ +import rosbag +from cv_bridge import CvBridge +import cv2 +from PIL import Image +import piexif +import numpy as np +import math +import os +from datetime import datetime + +def rad_to_deg(rad): + """Convert radians to degrees""" + return rad * 180.0 / math.pi + +def interpolate_position(gnss_times, gnss_data, target_time): + """ + Interpolate position using GNSS data + """ + # Find the two closest GNSS messages + idx = np.searchsorted(gnss_times, target_time) + if idx == 0: + lat_rad = gnss_data[0].latitude + lon_rad = gnss_data[0].longitude + return rad_to_deg(lat_rad), rad_to_deg(lon_rad) + if idx == len(gnss_times): + lat_rad = gnss_data[-1].latitude + lon_rad = gnss_data[-1].longitude + return rad_to_deg(lat_rad), rad_to_deg(lon_rad) + + # Get timestamps and data for interpolation + t0, t1 = gnss_times[idx-1], gnss_times[idx] + p0 = gnss_data[idx-1] + p1 = gnss_data[idx] + + # Calculate interpolation factor + alpha = (target_time - t0) / (t1 - t0) + + # Linear interpolation for position (in radians) + lat_rad = p0.latitude + alpha * (p1.latitude - p0.latitude) + lon_rad = p0.longitude + alpha * (p1.longitude - p0.longitude) + + # Convert to degrees + return rad_to_deg(lat_rad), rad_to_deg(lon_rad) + +def convert_gps_to_exif_format(latitude, longitude, altitude): + """Convert GPS coordinates and altitude to EXIF format""" + def decimal_to_dms(decimal): + decimal = float(decimal) + degrees = int(decimal) + minutes = int((decimal - degrees) * 60) + seconds = ((decimal - degrees) * 60 - minutes) * 60 + return (degrees, 1), (minutes, 1), (int(seconds * 1000), 1000) + + lat_dms = decimal_to_dms(abs(latitude)) + lon_dms = decimal_to_dms(abs(longitude)) + lat_ref = 'N' if latitude >= 0 else 'S' + lon_ref = 'E' if longitude >= 0 else 'W' + + # Convert altitude to EXIF format (rational number) + alt_ref = 0 if altitude >= 0 else 1 # 0 = above sea level, 1 = below sea level + altitude = abs(altitude) + alt_ratio = (int(altitude * 100), 100) # Multiply by 100 for 2 decimal precision + + return lat_dms, lon_dms, lat_ref, lon_ref, alt_ratio, alt_ref + +# Create images directory if it doesn't exist +os.makedirs('images', exist_ok=True) + +# Open the bag file +bag = rosbag.Bag('fr+gnss+lidar.bag') +bridge = CvBridge() + +# First pass: collect and sort GNSS data +print("Collecting GNSS data...") +gnss_times = [] +gnss_messages = [] + +for topic, msg, t in bag.read_messages(topics=['/septentrio_gnss/insnavgeod']): + gnss_times.append(t.to_sec()) + gnss_messages.append(msg) + +# Convert to numpy array for efficient searching +gnss_times = np.array(gnss_times) + +print(f"Collected {len(gnss_messages)} GNSS messages") + +# Process images with interpolated position data +print("Processing images...") +image_count = 0 +for topic, msg, t in bag.read_messages(topics=['/camera_fl/arena_camera_node/image_raw']): + # Convert ROS image to OpenCV image + cv_img = bridge.imgmsg_to_cv2(msg, desired_encoding='bgr8') # Explicitly request BGR + + # Convert BGR to RGB + rgb_img = cv2.cvtColor(cv_img, cv2.COLOR_BGR2RGB) + + # Convert to PIL Image + pil_img = Image.fromarray(rgb_img) + + # Get interpolated position data + image_time = t.to_sec() + lat, lon = interpolate_position(gnss_times, gnss_messages, image_time) + + # Get altitude from GNSS message (you'll need to interpolate this as well) + altitude = gnss_messages[np.searchsorted(gnss_times, image_time)].height + + # Convert GPS coordinates to EXIF format + lat_dms, lon_dms, lat_ref, lon_ref, alt_ratio, alt_ref = convert_gps_to_exif_format(lat, lon, altitude) + + timestamp = datetime.fromtimestamp(t.to_sec()) + + exif_dict = { + "0th": { + piexif.ImageIFD.Make: "FLIR".encode(), + piexif.ImageIFD.Model: "Blackfly S BFS-U3-16S2C".encode(), + piexif.ImageIFD.Software: "ROS".encode(), + piexif.ImageIFD.DateTime: timestamp.strftime("%Y:%m:%d %H:%M:%S").encode(), + piexif.ImageIFD.ImageDescription: "Captured with ROS and FLIR Blackfly S".encode(), + piexif.ImageIFD.XResolution: (msg.width, 1), + piexif.ImageIFD.YResolution: (msg.height, 1), + piexif.ImageIFD.ResolutionUnit: 2, # inches + }, + "Exif": { + piexif.ExifIFD.DateTimeOriginal: timestamp.strftime("%Y:%m:%d %H:%M:%S").encode(), + piexif.ExifIFD.DateTimeDigitized: timestamp.strftime("%Y:%m:%d %H:%M:%S").encode(), + piexif.ExifIFD.ExposureTime: (1, 100), # 1/100 second + piexif.ExifIFD.FNumber: (16, 10), # f/1.6 + piexif.ExifIFD.ExposureProgram: 1, # Manual + piexif.ExifIFD.ISOSpeedRatings: 100, + piexif.ExifIFD.ExifVersion: b'0230', + piexif.ExifIFD.ComponentsConfiguration: b'\x01\x02\x03\x00', # RGB + piexif.ExifIFD.FocalLength: (16, 1), # 16mm + piexif.ExifIFD.ColorSpace: 1, # sRGB + piexif.ExifIFD.PixelXDimension: msg.width, + piexif.ExifIFD.PixelYDimension: msg.height, + piexif.ExifIFD.ExposureMode: 1, # Manual exposure + piexif.ExifIFD.WhiteBalance: 1, # Manual white balance + piexif.ExifIFD.SceneCaptureType: 0, # Standard + }, + "GPS": { + piexif.GPSIFD.GPSLatitudeRef: lat_ref.encode(), + piexif.GPSIFD.GPSLatitude: lat_dms, + piexif.GPSIFD.GPSLongitudeRef: lon_ref.encode(), + piexif.GPSIFD.GPSLongitude: lon_dms, + piexif.GPSIFD.GPSAltitudeRef: alt_ref, + piexif.GPSIFD.GPSAltitude: alt_ratio, + piexif.GPSIFD.GPSTimeStamp: tuple(map(lambda x: (int(x), 1), timestamp.strftime("%H:%M:%S").split(":"))), + piexif.GPSIFD.GPSDateStamp: timestamp.strftime("%Y:%m:%d").encode(), + piexif.GPSIFD.GPSVersionID: (2, 3, 0, 0), + } +} + + # Convert to bytes + exif_bytes = piexif.dump(exif_dict) + + # Save image with EXIF data + output_filename = os.path.join('images', f'image_{image_time:.3f}.jpg') + pil_img.save( + output_filename, + 'jpeg', + exif=exif_bytes + ) + + image_count += 1 + +bag.close() +print(f"\nProcessing complete! Saved {image_count} images to the 'images' folder.") diff --git a/GEMstack/scripts/register_lidar_scans.py b/GEMstack/scripts/register_lidar_scans.py new file mode 100644 index 000000000..78be224a2 --- /dev/null +++ b/GEMstack/scripts/register_lidar_scans.py @@ -0,0 +1,296 @@ +import rosbag +import numpy as np +import open3d as o3d +import math +import copy +from scipy.spatial.transform import Rotation +from sensor_msgs import point_cloud2 +import pyproj + +# Define the coordinate transformations +wgs84 = pyproj.CRS('EPSG:4326') # WGS84 latitude/longitude +utm16N = pyproj.CRS('EPSG:32616') # UTM zone 16N +transformer = pyproj.Transformer.from_crs(wgs84, utm16N, always_xy=True) + +def interpolate_position(gnss_times, gnss_data, target_time): + """Interpolate position using GNSS data""" + idx = np.searchsorted(gnss_times, target_time) + if idx == 0: + return gnss_data[0] + if idx == len(gnss_times): + return gnss_data[-1] + + t0, t1 = gnss_times[idx-1], gnss_times[idx] + p0 = gnss_data[idx-1] + p1 = gnss_data[idx] + + alpha = (target_time - t0) / (t1 - t0) + + class InterpolatedGNSS: + pass + + msg = InterpolatedGNSS() + msg.latitude = p0.latitude + alpha * (p1.latitude - p0.latitude) + msg.longitude = p0.longitude + alpha * (p1.longitude - p0.longitude) + msg.height = p0.height + alpha * (p1.height - p0.height) + msg.roll = p0.roll + alpha * (p1.roll - p0.roll) + msg.pitch = p0.pitch + alpha * (p1.pitch - p0.pitch) + msg.heading = p0.heading + alpha * (p1.heading - p0.heading) + + return msg + + +def create_transformation_matrix(gnss_msg): + """Create 4x4 transformation matrix from GNSS data using UTM coordinates""" + # Convert lat/lon from radians to degrees + lon_deg = math.degrees(gnss_msg.longitude) + lat_deg = math.degrees(gnss_msg.latitude) + + try: + easting, northing = transformer.transform(lon_deg, lat_deg) + except Exception as e: + print(f"Error in UTM transformation: {e}") + return np.eye(4) + + # Store first position as origin + if not hasattr(create_transformation_matrix, "origin"): + create_transformation_matrix.origin = (easting, northing, gnss_msg.height) + + # Calculate position relative to origin in meters + dx = easting - create_transformation_matrix.origin[0] + dy = northing - create_transformation_matrix.origin[1] + dz = gnss_msg.height - create_transformation_matrix.origin[2] + + # Base transformation to align coordinate systems + base_rotation = np.array([ + [0, -1, 0], # x forward + [1, 0, 0], # y right + [0, 0, 1] # z up + ]) + + # Convert heading to radians and create rotation matrix + # Only using heading, setting pitch and roll to 0 + heading_rad = math.radians(gnss_msg.heading) + r = Rotation.from_euler('z', -heading_rad, degrees=False) + rotation_matrix = r.as_matrix() + + # Combine the rotations + final_rotation = rotation_matrix @ base_rotation + + # Create 4x4 transformation matrix + transform = np.eye(4) + transform[:3, :3] = final_rotation + transform[:3, 3] = [dx, dy, dz] + + return transform + +def preprocess_point_cloud(pcd, voxel_size): + """Preprocess point cloud for registration""" + # Voxel downsampling + pcd_down = pcd.voxel_down_sample(voxel_size=voxel_size) + if pcd_down is None or len(pcd_down.points) == 0: + return None + + # Remove outliers + cl, ind = pcd_down.remove_radius_outlier(nb_points=16, radius=0.5) + if cl is None or len(cl.points) == 0: + return None + + # Estimate normals + cl.estimate_normals( + o3d.geometry.KDTreeSearchParamHybrid(radius=voxel_size * 2, max_nn=30)) + + return cl + +def register_point_clouds(source, target, initial_transform=np.eye(4), voxel_size=0.1): + """Register two point clouds using ICP""" + source_down = preprocess_point_cloud(source, voxel_size) + target_down = preprocess_point_cloud(target, voxel_size) + + if source_down is None or target_down is None: + return initial_transform + + result = o3d.pipelines.registration.registration_icp( + source_down, target_down, voxel_size * 2, initial_transform, + o3d.pipelines.registration.TransformationEstimationPointToPlane(), + o3d.pipelines.registration.ICPConvergenceCriteria(max_iteration=50)) + + return result.transformation + +print("Opening bag file...") +bag = rosbag.Bag('fr+gnss+lidar.bag') + +# Collect GNSS messages +print("Collecting GNSS data...") +gnss_data = [] +gnss_times = [] + +for topic, msg, t in bag.read_messages(topics=['/septentrio_gnss/insnavgeod']): + gnss_times.append(t.to_sec()) + gnss_data.append(msg) + +gnss_times = np.array(gnss_times) +print(f"Collected {len(gnss_data)} GNSS messages") + +print("\nFirst few GNSS messages (converted):") +for i in range(min(5, len(gnss_data))): + msg = gnss_data[i] + print(f"\nGNSS Message {i}:") + print(f"Latitude: {math.degrees(msg.latitude)} degrees (converted from {msg.latitude} rad)") + print(f"Longitude: {math.degrees(msg.longitude)} degrees (converted from {msg.longitude} rad)") + print(f"Height: {msg.height} meters") + print(f"Roll: {msg.roll} rad") + print(f"Pitch: {msg.pitch} rad") + print(f"Heading: {msg.heading} degrees") + + +# First pass: Collect and preprocess all scans with initial UTM transformation +print("Processing LiDAR scans...") +scans = [] +scan_count = 0 + +for topic, msg, t in bag.read_messages(topics=['/ouster/points']): + # Get interpolated GNSS data + image_time = t.to_sec() + gnss_msg = interpolate_position(gnss_times, gnss_data, image_time) + + # Extract points + pc_data = point_cloud2.read_points(msg, field_names=("x", "y", "z"), skip_nans=True) + points = np.array(list(pc_data)) + + if len(points) == 0: + continue + + # Create point cloud + pcd = o3d.geometry.PointCloud() + pcd.points = o3d.utility.Vector3dVector(points) + + # Preprocess point cloud + processed_pcd = preprocess_point_cloud(pcd, voxel_size=0.1) + if processed_pcd is None: + continue + + # Get initial transformation from GNSS in UTM coordinates + utm_transform = create_transformation_matrix(gnss_msg) + + scans.append((image_time, processed_pcd, utm_transform)) + scan_count += 1 + if scan_count % 50 == 0: # Changed from 10 to 50 to reduce output + print(f"Processed {scan_count} scans") + +print(f"Total scans processed: {scan_count}") + +if len(scans) == 0: + print("No valid scans collected!") + bag.close() + exit() + +# Second pass: Progressive scan registration +print("Performing scan registration...") +registered_scans = [] + +# Start with the first scan +registered_scans.append((scans[0][1], scans[0][2])) +print(f"First scan points: {len(scans[0][1].points)}") + +# Register subsequent scans +for i in range(1, len(scans)): + print(f"Registering scan {i}/{len(scans)-1}...") + + _, current_scan, current_utm = scans[i] + print(f"Current scan points: {len(current_scan.points)}") + + # Get initial alignment from UTM + initial_transform = np.linalg.inv(scans[i-1][2]) @ current_utm + + # Perform ICP with previous scan + refined_transform = register_point_clouds( + current_scan, + registered_scans[-1][0], + initial_transform=initial_transform, + voxel_size=0.1 + ) + + # Update global transform + scan_global_transform = registered_scans[-1][1] @ refined_transform + + # Store registered scan + registered_scans.append((current_scan, scan_global_transform)) + + # ... (previous imports and functions remain the same) ... + +# Combine registered scans +print("Combining registered scans...") +combined_pcd = o3d.geometry.PointCloud() + +# Get the first GNSS message for absolute UTM coordinates +first_gnss = gnss_data[0] +lon_deg = math.degrees(first_gnss.longitude) +lat_deg = math.degrees(first_gnss.latitude) +print(f"First GNSS message (converted): lat={lat_deg}, lon={lon_deg}") + +try: + first_easting, first_northing = transformer.transform(lon_deg, lat_deg) + print(f"First point UTM: E={first_easting}, N={first_northing}") +except Exception as e: + print(f"Error converting first point to UTM: {e}") + first_easting, first_northing = 0, 0 + +all_points = [] +for i, (scan, transform) in enumerate(registered_scans): + print(f"\nProcessing scan {i}...") + + # Transform scan to global coordinate frame + scan_transformed = copy.deepcopy(scan) + + # Apply registration transform + scan_transformed.transform(transform) + + # Convert points to numpy array for easier manipulation + points = np.asarray(scan_transformed.points) + + # Check for invalid points + valid_mask = ~np.any(np.isnan(points) | np.isinf(points), axis=1) + if not np.all(valid_mask): + print(f"Removing {np.sum(~valid_mask)} invalid points from scan {i}") + points = points[valid_mask] + + if len(points) > 0: + # Add UTM offset without using transform + points = points + np.array([first_easting, first_northing, first_gnss.height]) + + # Check for invalid points after offset + valid_mask = ~np.any(np.isnan(points) | np.isinf(points), axis=1) + points = points[valid_mask] + + if len(points) > 0: + all_points.append(points) + print(f"Added {len(points)} valid points from scan {i}") + +# Combine all points +if all_points: + combined_points = np.vstack(all_points) + print(f"\nTotal combined points: {len(combined_points)}") + + # Create final point cloud + final_pcd = o3d.geometry.PointCloud() + final_pcd.points = o3d.utility.Vector3dVector(combined_points) + + # Optional: Downsample to reduce size + final_pcd = final_pcd.voxel_down_sample(voxel_size=0.1) + + # Optional: Remove statistical outliers + final_pcd, _ = final_pcd.remove_statistical_outlier(nb_neighbors=20, std_ratio=2.0) + + print(f"Final point cloud size after processing: {len(final_pcd.points)}") + + # Save result + print("\nSaving point cloud...") + try: + o3d.io.write_point_cloud("utm16N_registered_lidar.ply", final_pcd) + np.savetxt("utm16N_registered_lidar.xyz", np.asarray(final_pcd.points)) + print("Successfully saved point clouds") + except Exception as e: + print(f"Error saving point cloud: {e}") +else: + print("No valid points to combine!") diff --git a/GEMstack/state/all.py b/GEMstack/state/all.py index fc976fe24..457628cd2 100644 --- a/GEMstack/state/all.py +++ b/GEMstack/state/all.py @@ -7,6 +7,7 @@ from .agent_intent import AgentIntent,AgentIntentMixture from .relations import EntityRelation from .mission import MissionObjective +from .mission_plan import MissionPlan from .route import Route from .trajectory import Trajectory from .predicates import PredicateValues @@ -23,13 +24,15 @@ class AllState(SceneState): agent_intents : Dict[str,AgentIntentMixture] = field(default_factory=dict) relations : List[EntityRelation] = field(default_factory=list) predicates : PredicateValues = field(default_factory=PredicateValues) - + # planner-output state mission : MissionObjective = field(default_factory=MissionObjective) + mission_plan: MissionPlan = None intent : VehicleIntent = field(default_factory=VehicleIntent) + # planner_type : Optional[PlannerEnum] = None route : Optional[Route] = None trajectory : Optional[Trajectory] = None - + # update times for perception items (time.time()) vehicle_update_time : float = 0 roadgraph_update_time : float = 0 @@ -52,11 +55,11 @@ def zero(): scene_zero = SceneState.zero() keys = dict((k.name,getattr(scene_zero,k.name)) for k in fields(scene_zero)) return AllState(**keys) - + def to_frame(self, frame : ObjectFrameEnum) -> AllState: spose = self.start_vehicle_pose scene_to_frame = SceneState.to_frame(self,frame,current_pose=self.vehicle.pose,start_pose_abs=spose) new_intents = None if self.agent_intents is None else dict((k,v.to_frame(frame,current_pose=self.vehicle.pose,start_pose_abs=spose)) for k,v in self.agent_intents.items()) new_route = None if self.route is None else self.route.to_frame(frame,current_pose=self.vehicle.pose,start_pose_abs=spose) new_trajectory = None if self.trajectory is None else self.trajectory.to_frame(frame,current_pose=self.vehicle.pose,start_pose_abs=spose) - return replace(scene_to_frame, agent_intents = new_intents, route = new_route, trajectory = new_trajectory) \ No newline at end of file + return replace(scene_to_frame, agent_intents = new_intents, route = new_route, trajectory = new_trajectory) diff --git a/GEMstack/state/intent.py b/GEMstack/state/intent.py index da9c7d7ab..e9b69b2ae 100644 --- a/GEMstack/state/intent.py +++ b/GEMstack/state/intent.py @@ -14,6 +14,8 @@ class VehicleIntentEnum(Enum): PARKING = 8 # normal driving, executing parking behavior LEAVING_PARKING = 9 # normal driving, leaving a parking spot U_TURN = 10 # normal driving, executing U-turn outside of dedicated lane + CAMERA_FR = 11 # Capture only Front right camera images + CAMERA_RR = 12 # Capture only Rear right camera images @dataclass diff --git a/GEMstack/state/mission.py b/GEMstack/state/mission.py index 5ca1adeff..f7de00b9e 100644 --- a/GEMstack/state/mission.py +++ b/GEMstack/state/mission.py @@ -1,6 +1,7 @@ -from dataclasses import dataclass +from dataclasses import dataclass, field, field from ..utils.serialization import register from enum import Enum +from typing import List class MissionEnum(Enum): IDLE = 0 # not driving, no mission @@ -9,9 +10,10 @@ class MissionEnum(Enum): TELEOP = 3 # manual teleop control RECOVERY_STOP = 4 # abnormal condition detected, must stop now ESTOP = 5 # estop pressed, must stop now + INSPECT = 6 + INSPECT_UPLOAD = 7 @dataclass @register class MissionObjective: type : MissionEnum = MissionEnum.IDLE - \ No newline at end of file diff --git a/GEMstack/state/mission_plan.py b/GEMstack/state/mission_plan.py new file mode 100644 index 000000000..f32ea2639 --- /dev/null +++ b/GEMstack/state/mission_plan.py @@ -0,0 +1,19 @@ +from dataclasses import dataclass +from ..utils.serialization import register +from .route import PlannerEnum +from typing import Optional +from GEMstack.state.physical_object import ObjectPose + +@dataclass +@register +class MissionPlan: + goal_x: float + goal_y: float + goal_orientation: float + planner_type : PlannerEnum = PlannerEnum.RRT_STAR + goal_vehicle_pose : Optional[ObjectPose] = None + start_vehicle_pose : Optional[ObjectPose] = None + # other mission-specific parameters can be added here + + + \ No newline at end of file diff --git a/GEMstack/state/route.py b/GEMstack/state/route.py index ce373d7a4..fb9105123 100644 --- a/GEMstack/state/route.py +++ b/GEMstack/state/route.py @@ -2,17 +2,28 @@ from ..utils.serialization import register from .physical_object import ObjectFrameEnum, convert_point from .trajectory import Path -from typing import List,Tuple,Optional +from typing import List, Tuple, Optional + +from enum import Enum + + +class PlannerEnum(Enum): + RRT_STAR = 0 # position / yaw in m / radians relative to starting pose of vehicle + HYBRID_A_STAR = ( + 1 # position / yaw in m / radians relative to current pose of vehicle + ) + PARKING = 2 # position in longitude / latitude, yaw=heading in radians with respect to true north (used in GNSS) + @dataclass @register class Route(Path): """A sequence of waypoints and lanes that the motion planner will attempt to follow. Usually the path connects the centerlines of the given lanes. - + Unlike a Path, for the planner's convenience, the route should also extract out the wait lines (stop lines, crossings) from the roadgraph. """ - lanes : List[str] = field(default_factory=list) - wait_lines : List[str] = field(default_factory=list) + lanes: List[str] = field(default_factory=list) + wait_lines: List[str] = field(default_factory=list) diff --git a/GEMstack/state/trajectory.py b/GEMstack/state/trajectory.py index d7a57db00..c74145e82 100644 --- a/GEMstack/state/trajectory.py +++ b/GEMstack/state/trajectory.py @@ -89,7 +89,7 @@ def closest_point(self, x : List[float], edges = True) -> Tuple[float,float]: best_dist = float('inf') best_point = None for i,p in enumerate(self.points): - if edges and i > 0: + if edges and i > 0: p1 = self.points[i-1] p2 = p dist,u = transforms.point_segment_distance(x,p1,p2) @@ -118,13 +118,13 @@ def closest_point_local(self, x : List[float], param_range=Tuple[float,float], e imax = int(math.floor(param_range[1])) if imax == len(self.points): imax -= 1 - + umin = param_range[0] - imin umax = param_range[1] - imax best_point = None for i in range(imin,imax+1): p = self.points[i] - if edges and i > 0: + if edges and i > 0: p1 = self.points[i-1] p2 = p dist,u = transforms.point_segment_distance(x,p1,p2) @@ -154,7 +154,7 @@ def append_dim(self, value : Union[float,List[float]] = 0.0) -> None: raise ValueError("Invalid length of values to append") for p,v in zip(self.points,value): p.append(v) - + def trim(self, start : float, end : float) -> Path: """Returns a copy of this path but trimmed to the given parameter range.""" sind,su = self.parameter_to_index(start) @@ -169,6 +169,7 @@ def trim(self, start : float, end : float) -> Path: class Trajectory(Path): """A timed, piecewise linear path.""" times : List[float] + velocities : Optional[List[float]] = None def domain(self) -> Tuple[float,float]: """Returns the time parameter domain""" @@ -185,12 +186,12 @@ def time_to_index(self, t : float) -> Tuple[int,float]: if ind >= len(self.times): return len(self.points)-2,1.0 u = (t - self.times[ind-1])/(self.times[ind] - self.times[ind-1]) return ind-1,u - + def time_to_parameter(self, t : float) -> float: """Converts a time to a parameter.""" ind,u = self.time_to_index(t) return ind+u - + def parameter_to_time(self, u : float) -> float: """Converts a parameter to a time""" if len(self.points) < 2: @@ -244,7 +245,7 @@ def closest_point(self, x : List[float], edges = True) -> Tuple[float,float]: """Returns the closest point on the path to the given point. If edges=False, only computes the distances to the vertices, not the edges. This is slightly faster but less accurate. - + Returns (distance, closest_time) """ distance, closest_index = Path.closest_point(self,x,edges) @@ -254,10 +255,10 @@ def closest_point(self, x : List[float], edges = True) -> Tuple[float,float]: def closest_point_local(self, x : List[float], time_range=Tuple[float,float], edges = True) -> Tuple[float,float]: """Returns the closest point on the path to the given point within the given time range. - + If edges=False, only computes the distances to the vertices, not the edges. This is slightly faster but less accurate. - + Returns (distance, closest_time) """ param_range = [self.time_to_parameter(time_range[0]),self.time_to_parameter(time_range[1])] @@ -265,7 +266,7 @@ def closest_point_local(self, x : List[float], time_range=Tuple[float,float], ed distance, closest_index = Path.closest_point_local(self,x,param_range,edges) closest_time = self.parameter_to_time(closest_index) return distance, closest_time - + def trim(self, start : float, end : float) -> Trajectory: """Returns a copy of this trajectory but trimmed to the given time range.""" sind,su = self.time_to_index(start) @@ -279,8 +280,8 @@ def trim(self, start : float, end : float) -> Trajectory: def compute_headings(path : Path, smoothed = False) -> Path: """Converts a 2D (x,y) path into a 3D path (x,y,heading) or a 3D - (x,y,z) path into a 5D path (x,y,z,heading,pitch). - + (x,y,z) path into a 5D path (x,y,z,heading,pitch). + If smoothed=True, then the path is smoothed using a spline to better estimate good tangent vectors. """ diff --git a/launch/inspection.yaml b/launch/inspection.yaml new file mode 100644 index 000000000..580f3cba9 --- /dev/null +++ b/launch/inspection.yaml @@ -0,0 +1,86 @@ +description: "Run the yielding trajectory planner on the real vehicle with real perception" +mode: hardware +vehicle_interface: gem_hardware.GEMHardwareInterface +mission_execution: StandardExecutor + +# Recovery behavior after a component failure +recovery: + planning: + trajectory_tracking : recovery.StopTrajectoryTracker + +# Driving behavior for the GEM vehicle. Runs real pedestrian perception, yield planner, but does not send commands to real vehicle. +drive: + perception: + state_estimation : GNSSStateEstimator + perception_normalization : StandardPerceptionNormalizer + planning: + route_planning_component: + type: InspectRoutePlanner + args: + state_machine: ['IDLE', 'NAV', 'INSPECT', 'FINISH', 'IDLE'] + # save_lidar_data : capture_lidar_camera_data.SaveInspectionData + motion_planning: longitudinal_planning.YieldTrajectoryPlanner + # type: RouteToTrajectoryPlanner + # args: [null] #desired speed in m/s. If null, this will keep the route untimed for the trajectory tracker + trajectory_tracking: + type: pure_pursuit.PurePursuitTrajectoryTracker + # args: [null] + # args: {desired_speed: 2.5} #approximately 5mph + print: False + + + +log: + # Specify the top-level folder to save the log files. Default is 'logs' + #folder : 'logs' + # If prefix is specified, then the log folder will be named with the prefix followed by the date and time. Default no prefix + #prefix : 'fixed_route_' + # If suffix is specified, then logs will output to folder/prefix+suffix. Default uses date and time as the suffix + #suffix : 'test3' + # Specify which ros topics to record to vehicle.bag. Default records nothing. This records the "standard" ROS topics. + ros_topics : + # Specify options to pass to rosbag record. Default is no options. + #rosbag_options : '--split --size=1024' + # If True, then record all readings / commands of the vehicle interface. Default False + vehicle_interface : True + # Specify which components to record to behavior.json. Default records nothing + components : ['state_estimation','agent_detection','motion_planning'] + # Specify which components of state to record to state.json. Default records nothing + #state: ['all'] + # Specify the rate in Hz at which to record state to state.json. Default records at the pipeline's rate + #state_rate: 10 + # If True, then make plots of the recorded data specifically from PurePursuitTrajectoryTracker_debug.csv and report metrics. Default False + #auto_plot : True +replay: # Add items here to set certain topics / inputs to be replayed from logs + # Specify which log folder to replay from + log: + ros_topics : [] + components : [] + +#usually can keep this constant +computation_graph: !include "../GEMstack/knowledge/defaults/computation_graph.yaml" + +variants: + sim: + run: + description: "Run the yielding trajectory planner in simulation with faked perception" + mode: simulation + vehicle_interface: + type: gem_simulator.GEMDoubleIntegratorSimulationInterface + args: + scene: !relative_path '../scenes/xyhead_demo.yaml' + visualization: !include "klampt_visualization.yaml" + drive: + perception: + # agent_detection : pedestrian_detection.FakePedestrianDetector2D + agent_detection : OmniscientAgentDetector #this option reads agents from the simulator + state_estimation : OmniscientStateEstimator + gazebo: + run: + mode: simulation + vehicle_interface: + type: gem_gazebo.GEMGazeboInterface + drive: + perception: + state_estimation: GNSSStateEstimator # Matches your Gazebo GNSS implementation + # visualization: !include "mpl_visualization.yaml" From 908adc647c598f62942f13201f81256038115ff9 Mon Sep 17 00:00:00 2001 From: animeshsingh98 Date: Sat, 10 May 2025 15:47:25 -0500 Subject: [PATCH 2/7] Resolving comments --- .../onboard/planning/longitudinal_planning.py | 94 ++----------------- .../planning/route_planning_component.py | 3 +- GEMstack/state/all.py | 2 - GEMstack/state/mission_plan.py | 19 ---- GEMstack/state/route.py | 10 -- GEMstack/state/trajectory.py | 5 +- 6 files changed, 9 insertions(+), 124 deletions(-) delete mode 100644 GEMstack/state/mission_plan.py diff --git a/GEMstack/onboard/planning/longitudinal_planning.py b/GEMstack/onboard/planning/longitudinal_planning.py index 65ad6c00a..abf6510ec 100644 --- a/GEMstack/onboard/planning/longitudinal_planning.py +++ b/GEMstack/onboard/planning/longitudinal_planning.py @@ -440,7 +440,6 @@ def longitudinal_plan_milestone( # ============================================= print("-----LONGITUDINAL PLAN-----") - # print("path length: ", path.length()) length = path.length() # If the path is too short, just return the path for preventing sudden halt of simulation @@ -464,11 +463,6 @@ def longitudinal_plan_milestone( new_points.append(cur_point) new_times.append(cur_time) velocities.append(current_speed) - # print("=====================================") - # print("new points: ", new_points) - # print("current index: ", cur_index) - # print("current speed: ", current_speed) - # print("current position: ", cur_point) # Information we will need: # Calculate how much time it would take to stop @@ -477,7 +471,7 @@ def longitudinal_plan_milestone( min_delta_x_stop = ( current_speed * min_delta_t_stop - 0.5 * deceleration * min_delta_t_stop**2 ) - # print(min_delta_x_stop) + assert min_delta_x_stop >= 0 # Check if we are done @@ -495,7 +489,6 @@ def longitudinal_plan_milestone( # keep breaking until the next milestone in path if next_point[0] <= points[-1][0]: - # print("continuing to next point") delta_t_to_next_x = compute_time_to_x( cur_point[0], next_point[0], current_speed, -deceleration ) @@ -511,7 +504,7 @@ def longitudinal_plan_milestone( ) cur_point = next_point cur_time += delta_t_to_next_x - # current_speed would not be exactly zero error would be less than 1e-4 but perfer to just set to zero + # current_speed would not be exactly zero error would be less than 1e-4 but prefer to just set to zero # current_speed -= delta_t_to_next_x*deceleration current_speed = 0 assert current_speed == 0 @@ -520,7 +513,6 @@ def longitudinal_plan_milestone( # because the first if-statement covers for when we decelerating, # the only time current_speed < max_speed is when we are accelerating elif current_speed < max_speed: - # print("In case two") # next point next_point = points[cur_index + 1] # accelerate to max speed @@ -572,10 +564,7 @@ def longitudinal_plan_milestone( cur_point[0] + delta_x_to_max_speed + delta_x_to_stop_from_max_speed >= points[-1][0] ): - # print(delta_x_to_max_speed) - # print(delta_x_to_stop_from_max_speed) # Add a new point at the point where we should start breaking - # print("Adding new point to start breaking") delta_t_to_next_x = compute_time_triangle( cur_point[0], points[-1][0], @@ -584,8 +573,6 @@ def longitudinal_plan_milestone( acceleration, deceleration, ) - # print(delta_t_to_next_x) - # delta_t_to_next_x = compute_time_to_x(cur_point[0], points[-1][0] - min_delta_x_stop, current_speed, acceleration) next_x = ( cur_point[0] + current_speed * delta_t_to_next_x @@ -598,7 +585,6 @@ def longitudinal_plan_milestone( # this is the case where we would reach max speed before the next point # we need to create a new point where we would reach max speed else: - # print("adding new point") # we would need to add a new point at max speed cur_time += delta_t_to_max_speed cur_point = [cur_point[0] + delta_x_to_max_speed, 0] @@ -611,11 +597,9 @@ def longitudinal_plan_milestone( elif current_speed == max_speed: next_point = points[cur_index + 1] # continue on with max speed - # print("In case three") # add point to start decelerating if next_point[0] + min_delta_x_stop >= points[-1][0]: - # print("Adding new point to start decelerating") cur_time += ( points[-1][0] - min_delta_x_stop - cur_point[0] ) / current_speed @@ -623,7 +607,6 @@ def longitudinal_plan_milestone( current_speed = max_speed else: # Continue on to next point - # print("Continuing on to next point") cur_time += (next_point[0] - cur_point[0]) / current_speed cur_point = next_point cur_index += 1 @@ -634,7 +617,6 @@ def longitudinal_plan_milestone( # We need to hit the breaks next_point = points[cur_index + 1] - # print("In case four") # slow down to max speed delta_t_to_max_speed = (current_speed - max_speed) / deceleration delta_x_to_max_speed = ( @@ -676,7 +658,7 @@ def longitudinal_plan_milestone( # ============================================= - trajectory = Trajectory(path.frame, points, times, velocities) + trajectory = Trajectory(path.frame, points, times) return trajectory @@ -852,56 +834,7 @@ def longitudinal_plan_dt( points = [path_norm.eval(s) for s in s_vals] print("Number of time steps is --------------------", num_time_steps) - # return Trajectory(path_norm.frame, points, times) - - # # Plot: update a single window - # import matplotlib.pyplot as plt - # plt.figure("Distance vs Time") - # plt.clf() # Clear the current figure - # plt.plot(times, s_vals) - # plt.xlabel("Time (s)") - # plt.ylabel("Distance (m)") - # plt.title("Distance vs Time") - # plt.draw() - # plt.pause(0.001) - - # 4. Create a time grid. - # dt = 0.1 # adjust based on computation - # times = np.arange(0, t_final + dt, dt) - # num_time_steps = 0 - - # # 5. Compute the distance s(t) for each time step. - # s_vals = [] - # for t in times: - # if profile_type == "trapezoidal": - # if t < t_accel: - # # Acceleration phase. - # s = current_speed * t + 0.5 * acceleration * t**2 - # elif t < t_accel + t_cruise: - # # Cruise phase. - # s = d_accel + max_speed * (t - t_accel) - # else: - # # Deceleration phase. - # t_decel_phase = t - (t_accel + t_cruise) - # # Compute the remaining distance using the deceleration equation. - # s = total_length - 0.5 * deceleration * (t_decel - t_decel_phase)**2 - # else: # Triangular profile. - # if t < t_accel: - # # Acceleration phase. - # s = current_speed * t + 0.5 * acceleration * t**2 - # else: - # t_decel_phase = t - t_accel - # s_accel = current_speed * t_accel + 0.5 * acceleration * t_accel**2 - # s = s_accel + peak_speed * t_decel_phase - 0.5 * deceleration * t_decel_phase**2 - # num_time_steps +=1 - - # # should not exceed total path length - # s_vals.append(min(s, total_length)) - # print("NUmber of time steps -----------",num_time_steps) - # print("T FInal ----------------------------", t_final) - # points = [path_norm.eval(s) for s in s_vals] - - trajectory = Trajectory(path_norm.frame, points, list(times), velocities=velocities) + trajectory = Trajectory(path_norm.frame, points, list(times)) return trajectory @@ -1168,7 +1101,7 @@ def longitudinal_brake( # ============================================= - trajectory = Trajectory(path.frame, points, times, velocities) + trajectory = Trajectory(path.frame, points, times) return trajectory @@ -1177,11 +1110,6 @@ def longitudinal_brake( ################################################################################ -########################## -##### Patrick's Code ##### -########################## - - class YieldTrajectoryPlanner(Component): """Follows the given route. Brakes if you have to yield or you are at the end of the route, otherwise accelerates to @@ -1236,11 +1164,6 @@ def update(self, state: AllState): abs_x = curr_x + state.start_vehicle_pose.x abs_y = curr_y + state.start_vehicle_pose.y - ############################################### - # print("@@@@@ VEHICLE STATE @@@@@") - # print(vehicle) - # print("@@@@@@@@@@@@@@@@@@@@@@@@@") - if self.mode == "real": abs_x = curr_x abs_y = curr_y @@ -1263,16 +1186,11 @@ def update(self, state: AllState): ) self.route_progress = closest_parameter - # lookahead_distance = max(10, curr_v**2 / (2 * self.deceleration)) - # print("Lookahead distance:", lookahead_distance) - route_to_end = route.trim(closest_parameter, len(route.points) - 1) should_yield = False yield_deceleration = 0.0 - # print("Current Speed: ", curr_v) - for r in state.relations: if r.type == EntityRelationEnum.YIELDING and r.obj1 == "": # get the object we are yielding to @@ -1340,4 +1258,4 @@ def update(self, state: AllState): self.planner, ) - return traj \ No newline at end of file + return traj diff --git a/GEMstack/onboard/planning/route_planning_component.py b/GEMstack/onboard/planning/route_planning_component.py index 1eeebe090..3b2f8a94b 100644 --- a/GEMstack/onboard/planning/route_planning_component.py +++ b/GEMstack/onboard/planning/route_planning_component.py @@ -6,9 +6,8 @@ from GEMstack.state.agent import AgentState from GEMstack.state.mission import MissionEnum from GEMstack.state.all import AllState -from GEMstack.state.mission_plan import MissionPlan from GEMstack.state.physical_object import ObjectFrameEnum, ObjectPose -from GEMstack.state.route import PlannerEnum, Route +from GEMstack.state.route import Route from GEMstack.state.vehicle import VehicleState from GEMstack.state.intent import VehicleIntentEnum from .planner import optimized_kinodynamic_rrt_planning diff --git a/GEMstack/state/all.py b/GEMstack/state/all.py index 457628cd2..631d1c6a8 100644 --- a/GEMstack/state/all.py +++ b/GEMstack/state/all.py @@ -7,7 +7,6 @@ from .agent_intent import AgentIntent,AgentIntentMixture from .relations import EntityRelation from .mission import MissionObjective -from .mission_plan import MissionPlan from .route import Route from .trajectory import Trajectory from .predicates import PredicateValues @@ -27,7 +26,6 @@ class AllState(SceneState): # planner-output state mission : MissionObjective = field(default_factory=MissionObjective) - mission_plan: MissionPlan = None intent : VehicleIntent = field(default_factory=VehicleIntent) # planner_type : Optional[PlannerEnum] = None route : Optional[Route] = None diff --git a/GEMstack/state/mission_plan.py b/GEMstack/state/mission_plan.py deleted file mode 100644 index f32ea2639..000000000 --- a/GEMstack/state/mission_plan.py +++ /dev/null @@ -1,19 +0,0 @@ -from dataclasses import dataclass -from ..utils.serialization import register -from .route import PlannerEnum -from typing import Optional -from GEMstack.state.physical_object import ObjectPose - -@dataclass -@register -class MissionPlan: - goal_x: float - goal_y: float - goal_orientation: float - planner_type : PlannerEnum = PlannerEnum.RRT_STAR - goal_vehicle_pose : Optional[ObjectPose] = None - start_vehicle_pose : Optional[ObjectPose] = None - # other mission-specific parameters can be added here - - - \ No newline at end of file diff --git a/GEMstack/state/route.py b/GEMstack/state/route.py index fb9105123..b0686c687 100644 --- a/GEMstack/state/route.py +++ b/GEMstack/state/route.py @@ -4,16 +4,6 @@ from .trajectory import Path from typing import List, Tuple, Optional -from enum import Enum - - -class PlannerEnum(Enum): - RRT_STAR = 0 # position / yaw in m / radians relative to starting pose of vehicle - HYBRID_A_STAR = ( - 1 # position / yaw in m / radians relative to current pose of vehicle - ) - PARKING = 2 # position in longitude / latitude, yaw=heading in radians with respect to true north (used in GNSS) - @dataclass @register diff --git a/GEMstack/state/trajectory.py b/GEMstack/state/trajectory.py index c74145e82..af0f70122 100644 --- a/GEMstack/state/trajectory.py +++ b/GEMstack/state/trajectory.py @@ -106,10 +106,10 @@ def closest_point(self, x : List[float], edges = True) -> Tuple[float,float]: def closest_point_local(self, x : List[float], param_range=Tuple[float,float], edges = True) -> Tuple[float,float]: """Returns the closest point on the path to the given point within the given parameter range. - + If edges=False, only computes the distances to the vertices, not the edges. This is slightly faster but less accurate. - + Returns (distance, closest_parameter) """ best_dist = float('inf') @@ -169,7 +169,6 @@ def trim(self, start : float, end : float) -> Path: class Trajectory(Path): """A timed, piecewise linear path.""" times : List[float] - velocities : Optional[List[float]] = None def domain(self) -> Tuple[float,float]: """Returns the time parameter domain""" From 2f6a22576b29149db2cffb63f0c496c197e08e51 Mon Sep 17 00:00:00 2001 From: animeshsingh98 Date: Sat, 10 May 2025 15:47:25 -0500 Subject: [PATCH 3/7] Resolving comments --- .../onboard/planning/longitudinal_planning.py | 94 ++----------------- .../planning/route_planning_component.py | 3 +- GEMstack/state/all.py | 3 - GEMstack/state/mission_plan.py | 19 ---- GEMstack/state/route.py | 10 -- GEMstack/state/trajectory.py | 5 +- 6 files changed, 9 insertions(+), 125 deletions(-) delete mode 100644 GEMstack/state/mission_plan.py diff --git a/GEMstack/onboard/planning/longitudinal_planning.py b/GEMstack/onboard/planning/longitudinal_planning.py index 65ad6c00a..abf6510ec 100644 --- a/GEMstack/onboard/planning/longitudinal_planning.py +++ b/GEMstack/onboard/planning/longitudinal_planning.py @@ -440,7 +440,6 @@ def longitudinal_plan_milestone( # ============================================= print("-----LONGITUDINAL PLAN-----") - # print("path length: ", path.length()) length = path.length() # If the path is too short, just return the path for preventing sudden halt of simulation @@ -464,11 +463,6 @@ def longitudinal_plan_milestone( new_points.append(cur_point) new_times.append(cur_time) velocities.append(current_speed) - # print("=====================================") - # print("new points: ", new_points) - # print("current index: ", cur_index) - # print("current speed: ", current_speed) - # print("current position: ", cur_point) # Information we will need: # Calculate how much time it would take to stop @@ -477,7 +471,7 @@ def longitudinal_plan_milestone( min_delta_x_stop = ( current_speed * min_delta_t_stop - 0.5 * deceleration * min_delta_t_stop**2 ) - # print(min_delta_x_stop) + assert min_delta_x_stop >= 0 # Check if we are done @@ -495,7 +489,6 @@ def longitudinal_plan_milestone( # keep breaking until the next milestone in path if next_point[0] <= points[-1][0]: - # print("continuing to next point") delta_t_to_next_x = compute_time_to_x( cur_point[0], next_point[0], current_speed, -deceleration ) @@ -511,7 +504,7 @@ def longitudinal_plan_milestone( ) cur_point = next_point cur_time += delta_t_to_next_x - # current_speed would not be exactly zero error would be less than 1e-4 but perfer to just set to zero + # current_speed would not be exactly zero error would be less than 1e-4 but prefer to just set to zero # current_speed -= delta_t_to_next_x*deceleration current_speed = 0 assert current_speed == 0 @@ -520,7 +513,6 @@ def longitudinal_plan_milestone( # because the first if-statement covers for when we decelerating, # the only time current_speed < max_speed is when we are accelerating elif current_speed < max_speed: - # print("In case two") # next point next_point = points[cur_index + 1] # accelerate to max speed @@ -572,10 +564,7 @@ def longitudinal_plan_milestone( cur_point[0] + delta_x_to_max_speed + delta_x_to_stop_from_max_speed >= points[-1][0] ): - # print(delta_x_to_max_speed) - # print(delta_x_to_stop_from_max_speed) # Add a new point at the point where we should start breaking - # print("Adding new point to start breaking") delta_t_to_next_x = compute_time_triangle( cur_point[0], points[-1][0], @@ -584,8 +573,6 @@ def longitudinal_plan_milestone( acceleration, deceleration, ) - # print(delta_t_to_next_x) - # delta_t_to_next_x = compute_time_to_x(cur_point[0], points[-1][0] - min_delta_x_stop, current_speed, acceleration) next_x = ( cur_point[0] + current_speed * delta_t_to_next_x @@ -598,7 +585,6 @@ def longitudinal_plan_milestone( # this is the case where we would reach max speed before the next point # we need to create a new point where we would reach max speed else: - # print("adding new point") # we would need to add a new point at max speed cur_time += delta_t_to_max_speed cur_point = [cur_point[0] + delta_x_to_max_speed, 0] @@ -611,11 +597,9 @@ def longitudinal_plan_milestone( elif current_speed == max_speed: next_point = points[cur_index + 1] # continue on with max speed - # print("In case three") # add point to start decelerating if next_point[0] + min_delta_x_stop >= points[-1][0]: - # print("Adding new point to start decelerating") cur_time += ( points[-1][0] - min_delta_x_stop - cur_point[0] ) / current_speed @@ -623,7 +607,6 @@ def longitudinal_plan_milestone( current_speed = max_speed else: # Continue on to next point - # print("Continuing on to next point") cur_time += (next_point[0] - cur_point[0]) / current_speed cur_point = next_point cur_index += 1 @@ -634,7 +617,6 @@ def longitudinal_plan_milestone( # We need to hit the breaks next_point = points[cur_index + 1] - # print("In case four") # slow down to max speed delta_t_to_max_speed = (current_speed - max_speed) / deceleration delta_x_to_max_speed = ( @@ -676,7 +658,7 @@ def longitudinal_plan_milestone( # ============================================= - trajectory = Trajectory(path.frame, points, times, velocities) + trajectory = Trajectory(path.frame, points, times) return trajectory @@ -852,56 +834,7 @@ def longitudinal_plan_dt( points = [path_norm.eval(s) for s in s_vals] print("Number of time steps is --------------------", num_time_steps) - # return Trajectory(path_norm.frame, points, times) - - # # Plot: update a single window - # import matplotlib.pyplot as plt - # plt.figure("Distance vs Time") - # plt.clf() # Clear the current figure - # plt.plot(times, s_vals) - # plt.xlabel("Time (s)") - # plt.ylabel("Distance (m)") - # plt.title("Distance vs Time") - # plt.draw() - # plt.pause(0.001) - - # 4. Create a time grid. - # dt = 0.1 # adjust based on computation - # times = np.arange(0, t_final + dt, dt) - # num_time_steps = 0 - - # # 5. Compute the distance s(t) for each time step. - # s_vals = [] - # for t in times: - # if profile_type == "trapezoidal": - # if t < t_accel: - # # Acceleration phase. - # s = current_speed * t + 0.5 * acceleration * t**2 - # elif t < t_accel + t_cruise: - # # Cruise phase. - # s = d_accel + max_speed * (t - t_accel) - # else: - # # Deceleration phase. - # t_decel_phase = t - (t_accel + t_cruise) - # # Compute the remaining distance using the deceleration equation. - # s = total_length - 0.5 * deceleration * (t_decel - t_decel_phase)**2 - # else: # Triangular profile. - # if t < t_accel: - # # Acceleration phase. - # s = current_speed * t + 0.5 * acceleration * t**2 - # else: - # t_decel_phase = t - t_accel - # s_accel = current_speed * t_accel + 0.5 * acceleration * t_accel**2 - # s = s_accel + peak_speed * t_decel_phase - 0.5 * deceleration * t_decel_phase**2 - # num_time_steps +=1 - - # # should not exceed total path length - # s_vals.append(min(s, total_length)) - # print("NUmber of time steps -----------",num_time_steps) - # print("T FInal ----------------------------", t_final) - # points = [path_norm.eval(s) for s in s_vals] - - trajectory = Trajectory(path_norm.frame, points, list(times), velocities=velocities) + trajectory = Trajectory(path_norm.frame, points, list(times)) return trajectory @@ -1168,7 +1101,7 @@ def longitudinal_brake( # ============================================= - trajectory = Trajectory(path.frame, points, times, velocities) + trajectory = Trajectory(path.frame, points, times) return trajectory @@ -1177,11 +1110,6 @@ def longitudinal_brake( ################################################################################ -########################## -##### Patrick's Code ##### -########################## - - class YieldTrajectoryPlanner(Component): """Follows the given route. Brakes if you have to yield or you are at the end of the route, otherwise accelerates to @@ -1236,11 +1164,6 @@ def update(self, state: AllState): abs_x = curr_x + state.start_vehicle_pose.x abs_y = curr_y + state.start_vehicle_pose.y - ############################################### - # print("@@@@@ VEHICLE STATE @@@@@") - # print(vehicle) - # print("@@@@@@@@@@@@@@@@@@@@@@@@@") - if self.mode == "real": abs_x = curr_x abs_y = curr_y @@ -1263,16 +1186,11 @@ def update(self, state: AllState): ) self.route_progress = closest_parameter - # lookahead_distance = max(10, curr_v**2 / (2 * self.deceleration)) - # print("Lookahead distance:", lookahead_distance) - route_to_end = route.trim(closest_parameter, len(route.points) - 1) should_yield = False yield_deceleration = 0.0 - # print("Current Speed: ", curr_v) - for r in state.relations: if r.type == EntityRelationEnum.YIELDING and r.obj1 == "": # get the object we are yielding to @@ -1340,4 +1258,4 @@ def update(self, state: AllState): self.planner, ) - return traj \ No newline at end of file + return traj diff --git a/GEMstack/onboard/planning/route_planning_component.py b/GEMstack/onboard/planning/route_planning_component.py index 1eeebe090..3b2f8a94b 100644 --- a/GEMstack/onboard/planning/route_planning_component.py +++ b/GEMstack/onboard/planning/route_planning_component.py @@ -6,9 +6,8 @@ from GEMstack.state.agent import AgentState from GEMstack.state.mission import MissionEnum from GEMstack.state.all import AllState -from GEMstack.state.mission_plan import MissionPlan from GEMstack.state.physical_object import ObjectFrameEnum, ObjectPose -from GEMstack.state.route import PlannerEnum, Route +from GEMstack.state.route import Route from GEMstack.state.vehicle import VehicleState from GEMstack.state.intent import VehicleIntentEnum from .planner import optimized_kinodynamic_rrt_planning diff --git a/GEMstack/state/all.py b/GEMstack/state/all.py index 457628cd2..1f50e9b34 100644 --- a/GEMstack/state/all.py +++ b/GEMstack/state/all.py @@ -7,7 +7,6 @@ from .agent_intent import AgentIntent,AgentIntentMixture from .relations import EntityRelation from .mission import MissionObjective -from .mission_plan import MissionPlan from .route import Route from .trajectory import Trajectory from .predicates import PredicateValues @@ -27,9 +26,7 @@ class AllState(SceneState): # planner-output state mission : MissionObjective = field(default_factory=MissionObjective) - mission_plan: MissionPlan = None intent : VehicleIntent = field(default_factory=VehicleIntent) - # planner_type : Optional[PlannerEnum] = None route : Optional[Route] = None trajectory : Optional[Trajectory] = None diff --git a/GEMstack/state/mission_plan.py b/GEMstack/state/mission_plan.py deleted file mode 100644 index f32ea2639..000000000 --- a/GEMstack/state/mission_plan.py +++ /dev/null @@ -1,19 +0,0 @@ -from dataclasses import dataclass -from ..utils.serialization import register -from .route import PlannerEnum -from typing import Optional -from GEMstack.state.physical_object import ObjectPose - -@dataclass -@register -class MissionPlan: - goal_x: float - goal_y: float - goal_orientation: float - planner_type : PlannerEnum = PlannerEnum.RRT_STAR - goal_vehicle_pose : Optional[ObjectPose] = None - start_vehicle_pose : Optional[ObjectPose] = None - # other mission-specific parameters can be added here - - - \ No newline at end of file diff --git a/GEMstack/state/route.py b/GEMstack/state/route.py index fb9105123..b0686c687 100644 --- a/GEMstack/state/route.py +++ b/GEMstack/state/route.py @@ -4,16 +4,6 @@ from .trajectory import Path from typing import List, Tuple, Optional -from enum import Enum - - -class PlannerEnum(Enum): - RRT_STAR = 0 # position / yaw in m / radians relative to starting pose of vehicle - HYBRID_A_STAR = ( - 1 # position / yaw in m / radians relative to current pose of vehicle - ) - PARKING = 2 # position in longitude / latitude, yaw=heading in radians with respect to true north (used in GNSS) - @dataclass @register diff --git a/GEMstack/state/trajectory.py b/GEMstack/state/trajectory.py index c74145e82..af0f70122 100644 --- a/GEMstack/state/trajectory.py +++ b/GEMstack/state/trajectory.py @@ -106,10 +106,10 @@ def closest_point(self, x : List[float], edges = True) -> Tuple[float,float]: def closest_point_local(self, x : List[float], param_range=Tuple[float,float], edges = True) -> Tuple[float,float]: """Returns the closest point on the path to the given point within the given parameter range. - + If edges=False, only computes the distances to the vertices, not the edges. This is slightly faster but less accurate. - + Returns (distance, closest_parameter) """ best_dist = float('inf') @@ -169,7 +169,6 @@ def trim(self, start : float, end : float) -> Path: class Trajectory(Path): """A timed, piecewise linear path.""" times : List[float] - velocities : Optional[List[float]] = None def domain(self) -> Tuple[float,float]: """Returns the time parameter domain""" From 1cb9910229a4a5f0424f224bf470ed9da0e91ded Mon Sep 17 00:00:00 2001 From: animeshsingh98 Date: Mon, 12 May 2025 12:30:04 -0500 Subject: [PATCH 4/7] Additional changes for saving image --- .../onboard/planning/capture_lidar_camera_data.py | 13 +++++++++++-- .../onboard/planning/route_planning_component.py | 9 +-------- launch/inspection.yaml | 2 +- 3 files changed, 13 insertions(+), 11 deletions(-) diff --git a/GEMstack/onboard/planning/capture_lidar_camera_data.py b/GEMstack/onboard/planning/capture_lidar_camera_data.py index fbb6c1de5..33070037e 100644 --- a/GEMstack/onboard/planning/capture_lidar_camera_data.py +++ b/GEMstack/onboard/planning/capture_lidar_camera_data.py @@ -1,6 +1,7 @@ from ...state import AllState, VehicleState, ObjectPose, ObjectFrameEnum, AgentState, AgentEnum, AgentActivityEnum, MissionEnum from ..interface.gem import GEMInterface from ..component import Component +from ...state.intent import VehicleIntentEnum import cv2 from typing import Dict import numpy as np @@ -122,8 +123,16 @@ def update(self, state) -> Dict[str, AgentState]: lidar_pc = self.latest_lidar.copy() camera_fr = self.latest_fr_image.copy() camera_rr = self.latest_rr_image.copy() - cv2.imwrite(os.path.join(self.folder_path, f'fr_{self.index}.png'), camera_fr) - cv2.imwrite(os.path.join(self.folder_path, f'rr_{self.index}.png'), camera_rr) + + ## Image saving logic to avoid storing huge amount of data + if state.intent.type == VehicleIntentEnum.CAMERA_FR: + cv2.imwrite(os.path.join(self.folder_path, f'fr_{self.index}.png'), camera_fr) + elif state.intent.type == VehicleIntentEnum.CAMERA_RR: + cv2.imwrite(os.path.join(self.folder_path, f'rr_{self.index}.png'), camera_rr) + + ## Store all images + # cv2.imwrite(os.path.join(self.folder_path, f'fr_{self.index}.png'), camera_fr) + # cv2.imwrite(os.path.join(self.folder_path, f'rr_{self.index}.png'), camera_rr) # Save GNSS data with open(os.path.join(self.folder_path, 'gnss.txt'), 'a') as fh: diff --git a/GEMstack/onboard/planning/route_planning_component.py b/GEMstack/onboard/planning/route_planning_component.py index 3b2f8a94b..5f6dbee67 100644 --- a/GEMstack/onboard/planning/route_planning_component.py +++ b/GEMstack/onboard/planning/route_planning_component.py @@ -456,7 +456,7 @@ def update(self, state): state.intent = VehicleIntentEnum.CAMERA_RR elif self.mission == "FINISH": - state.mission.type = MissionEnum.HOME + state.mission.type = MissionEnum.INSPECT_UPLOAD goal = ObjectPose( frame=ObjectFrameEnum.START, t=state.start_vehicle_pose.t, @@ -472,12 +472,5 @@ def update(self, state): self.planned_path_already = True self.route = self.x - ## GOAL condition - if (abs(state.vehicle.pose.x - goal.x) <= 3 and abs(state.vehicle.pose.y - goal.y) <= 3): - print(self.state_list[self.index + 1]) - self.mission = self.state_list[self.index + 1] - self.index += 1 - print("CHANGING STATES", self.mission) - print("-------------------------------------------------") return [self.route, state.mission] diff --git a/launch/inspection.yaml b/launch/inspection.yaml index 580f3cba9..d29921a6a 100644 --- a/launch/inspection.yaml +++ b/launch/inspection.yaml @@ -17,7 +17,7 @@ drive: route_planning_component: type: InspectRoutePlanner args: - state_machine: ['IDLE', 'NAV', 'INSPECT', 'FINISH', 'IDLE'] + state_machine: ['IDLE', 'NAV', 'INSPECT', 'FINISH'] # save_lidar_data : capture_lidar_camera_data.SaveInspectionData motion_planning: longitudinal_planning.YieldTrajectoryPlanner # type: RouteToTrajectoryPlanner From 86f872bdb1d5c4edd3f33632d0ceed12796c937c Mon Sep 17 00:00:00 2001 From: Animesh Singh Date: Mon, 12 May 2025 14:13:47 -0500 Subject: [PATCH 5/7] Update python-app.yml --- .github/workflows/python-app.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/python-app.yml b/.github/workflows/python-app.yml index 3de575d8e..27de95892 100644 --- a/.github/workflows/python-app.yml +++ b/.github/workflows/python-app.yml @@ -30,7 +30,7 @@ jobs: - name: Lint with flake8 run: | # stop the build if there are Python syntax errors or undefined names - flake8 ./GEMstack --count --select=E9,F63,F7,F82 --show-source --statistics --exclude=__init__.py || exit 1 + flake8 ./GEMstack --count --select=E9,F63,F7,F82 --ignore=F824 --show-source --statistics --exclude=__init__.py || exit 1 # to enable more advanced checks on the repo, uncomment the lines below (There are around 3000 violations) # flake8 ./GEMstack --ignore=D,C901,E402,E231 --count --max-complexity=10 --max-line-length=127 --statistics --exclude=__init__.py || exit 1 # if we want to enable documentation checks, uncomment the line below From c50d67a6cb7605a63b871fde2b112e239d20563c Mon Sep 17 00:00:00 2001 From: Mohammad Fakhreddine Date: Mon, 12 May 2025 14:32:12 -0500 Subject: [PATCH 6/7] Small modification to geotag_from_rosbag.py to handle arg parser. Added script to geotag images for individual files and gnss.txt file. --- GEMstack/scripts/geotag_from_files.py | 64 +++++++++++++++++++ .../{geotag.py => geotag_from_rosbag.py} | 29 +++++++-- 2 files changed, 86 insertions(+), 7 deletions(-) create mode 100644 GEMstack/scripts/geotag_from_files.py rename GEMstack/scripts/{geotag.py => geotag_from_rosbag.py} (90%) diff --git a/GEMstack/scripts/geotag_from_files.py b/GEMstack/scripts/geotag_from_files.py new file mode 100644 index 000000000..48d55977e --- /dev/null +++ b/GEMstack/scripts/geotag_from_files.py @@ -0,0 +1,64 @@ +import numpy as np +import pandas as pd +from scipy.spatial.transform import Rotation as R +from pyproj import Transformer +import os +""" + This script geotags images based on vehicle GNSS data and images collected from the front right and rear right camera. + It computes the camera poses in the world frame and saves them to a geo.txt file which can be automatically processed by OpenDroneMap for 3D reconstruction. +""" +# Fixed camera-to-vehicle transform for front-right camera +T_v_to_c_fr = np.array([1.8861563355156226, -0.7733611068168774, 1.6793040225335112]) #form state estimation team +R_v_to_c_fr = R.from_euler('zyx', [45, 20, 0], degrees=True).as_matrix() + +# Fixed camera-to-vehicle transform for rear-right camera +T_v_to_c_rr = np.array([0.11419591502518789, -0.6896311735924415, 1.711181163333824]) +R_v_to_c_rr = R.from_euler('zyx', [135, 20, 0], degrees=True).as_matrix() + +# Load vehicle pose data +vehicle_df = pd.read_csv("gnss.txt", header=None, + names=["lat", "lon", "alt", "yaw", "roll", "pitch"]) + +# Convert lat/lon to UTM +transformer = Transformer.from_crs("EPSG:4326", "EPSG:32616", always_xy=True) # UTM Zone 16N for Illinois, change if in another zone. + +# Output rows +output_rows = [] + +# Process each row in the GNSS data +for i, row in vehicle_df.iterrows(): + lat, lon, alt = row["lat"], row["lon"], row["alt"] + yaw, roll, pitch = row["yaw"], row["roll"], row["pitch"] + + # Position in UTM (meters) + x, y = transformer.transform(lon, lat) + z = alt + + # Convert yaw, roll, pitch to rotation matrix (vehicle orientation in world frame) + R_vehicle = R.from_euler('zxy', [yaw, roll, pitch], degrees=True).as_matrix() + + # Compute front-right camera pose in world frame + R_camera_fr = R_vehicle @ R_v_to_c_fr + T_camera_fr = R_vehicle @ T_v_to_c_fr + np.array([x, y, z]) + yaw_fr, pitch_fr, roll_fr = R.from_matrix(R_camera_fr).as_euler('zxy', degrees=True) + output_rows.append([f"fr_{i:d}.png", T_camera_fr[0], T_camera_fr[1], T_camera_fr[2], yaw_fr, pitch_fr, roll_fr]) + + # Compute rear-right camera pose in world frame + R_camera_rr = R_vehicle @ R_v_to_c_rr + T_camera_rr = R_vehicle @ T_v_to_c_rr + np.array([x, y, z]) + yaw_rr, pitch_rr, roll_rr = R.from_matrix(R_camera_rr).as_euler('zxy', degrees=True) + output_rows.append([f"rr_{i:d}.png", T_camera_rr[0], T_camera_rr[1], T_camera_rr[2], yaw_rr, pitch_rr, roll_rr]) + +# Save to geo.txt +geo_df = pd.DataFrame(output_rows, columns=["image", "x", "y", "z", "yaw", "pitch", "roll"]) +geo_df.to_csv("geo_temp.txt", sep=" ", index=False, header=False) + +# Add EPSG zone information at the top of the file +epsg_zone = "EPSG:32616" # Replace with your EPSG zone if different +with open("geo.txt", "w") as geo_file: + geo_file.write(f"{epsg_zone}\n") # Write the EPSG zone as the first line + with open("geo_temp.txt", "r") as temp_file: + geo_file.write(temp_file.read()) # Append the rest of the data + +# Remove the temporary file +os.remove("geo_temp.txt") \ No newline at end of file diff --git a/GEMstack/scripts/geotag.py b/GEMstack/scripts/geotag_from_rosbag.py similarity index 90% rename from GEMstack/scripts/geotag.py rename to GEMstack/scripts/geotag_from_rosbag.py index d9b03a213..1ec16df3d 100644 --- a/GEMstack/scripts/geotag.py +++ b/GEMstack/scripts/geotag_from_rosbag.py @@ -7,6 +7,12 @@ import math import os from datetime import datetime +import argparse + +""" +This script geotags images based on vehicle GNSS data and images collected from any of the four corner cameras. +Functionality is limited to data that is extacted from a rosbag file. +""" def rad_to_deg(rad): """Convert radians to degrees""" @@ -63,13 +69,22 @@ def decimal_to_dms(decimal): return lat_dms, lon_dms, lat_ref, lon_ref, alt_ratio, alt_ref -# Create images directory if it doesn't exist -os.makedirs('images', exist_ok=True) - # Open the bag file -bag = rosbag.Bag('fr+gnss+lidar.bag') +parser = argparse.ArgumentParser( + description='A script to geotag images from a rosbag file using GNSS data.' + ) + +parser.add_argument( + 'bag_name', type = str, + help = 'The name of the rosbag file to process.' + ) +args = parser.parse_args() +bag = rosbag.Bag(args.bag_name, 'r') bridge = CvBridge() +# Create images directory if it doesn't exist +os.makedirs('images', exist_ok=True) + # First pass: collect and sort GNSS data print("Collecting GNSS data...") gnss_times = [] @@ -111,11 +126,11 @@ def decimal_to_dms(decimal): exif_dict = { "0th": { - piexif.ImageIFD.Make: "FLIR".encode(), - piexif.ImageIFD.Model: "Blackfly S BFS-U3-16S2C".encode(), + piexif.ImageIFD.Make: "Lucid".encode(), + piexif.ImageIFD.Model: "Triton 2.3 MP".encode(), piexif.ImageIFD.Software: "ROS".encode(), piexif.ImageIFD.DateTime: timestamp.strftime("%Y:%m:%d %H:%M:%S").encode(), - piexif.ImageIFD.ImageDescription: "Captured with ROS and FLIR Blackfly S".encode(), + piexif.ImageIFD.ImageDescription: "Captured with ROS and Lucid Triton 2.3 MP".encode(), piexif.ImageIFD.XResolution: (msg.width, 1), piexif.ImageIFD.YResolution: (msg.height, 1), piexif.ImageIFD.ResolutionUnit: 2, # inches From 9925d57623ec5739e6f0f829dd84af84d61cd1df Mon Sep 17 00:00:00 2001 From: Lord-Bubbles Date: Tue, 13 May 2025 11:38:19 -0500 Subject: [PATCH 7/7] Update server url --- GEMstack/onboard/planning/route_planning_component.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/GEMstack/onboard/planning/route_planning_component.py b/GEMstack/onboard/planning/route_planning_component.py index 5f6dbee67..46d19ce76 100644 --- a/GEMstack/onboard/planning/route_planning_component.py +++ b/GEMstack/onboard/planning/route_planning_component.py @@ -143,7 +143,7 @@ def create_path_around_inspection(inspection_area, geofence, margin=1.0): return full_path -def check_point_exists(vehicle, start_pose, server_url="https://summon-app-production.up.railway.app"): +def check_point_exists(vehicle, start_pose, server_url="https://cs588-prod.up.railway.app"): """Querying the frontend to get the inspection area.""" print("Vehicle pose frame", vehicle.pose.frame) try: