diff --git a/GEMstack/onboard/perception/cone_detection.py b/GEMstack/onboard/perception/cone_detection.py new file mode 100644 index 000000000..d41c527ae --- /dev/null +++ b/GEMstack/onboard/perception/cone_detection.py @@ -0,0 +1,666 @@ +from ...state import AllState, VehicleState, ObjectPose, ObjectFrameEnum, AgentState, AgentEnum, AgentActivityEnum +from ..interface.gem import GEMInterface +from ..component import Component +from ultralytics import YOLO +import cv2 +from typing import Dict +import open3d as o3d +import numpy as np +from sklearn.cluster import DBSCAN +from scipy.spatial.transform import Rotation as R +import rospy +from sensor_msgs.msg import PointCloud2, Image +import sensor_msgs.point_cloud2 as pc2 +import struct, ctypes +from message_filters import Subscriber, ApproximateTimeSynchronizer +from cv_bridge import CvBridge +import time +import math +import ros_numpy +import os + + +# ----- Helper Functions ----- + +def cylindrical_roi(points, center, radius, height): + horizontal_dist = np.linalg.norm(points[:, :2] - center[:2], axis=1) + vertical_diff = np.abs(points[:, 2] - center[2]) + mask = (horizontal_dist <= radius) & (vertical_diff <= height / 2) + return points[mask] + + +def undistort_image(image, K, D): + h, w = image.shape[:2] + newK, _ = cv2.getOptimalNewCameraMatrix(K, D, (w, h), 1, (w, h)) + undistorted = cv2.undistort(image, K, D, None, newK) + return undistorted, newK + +def filter_points_within_threshold(points, threshold=15.0): + distances = np.linalg.norm(points, axis=1) + mask = distances <= threshold + return points[mask] + + +def match_existing_cone( + new_center: np.ndarray, + new_dims: tuple, + existing_agents: Dict[str, AgentState], + distance_threshold: float = 1.0 +) -> str: + """ + Find the closest existing Cone agent within a specified distance threshold. + """ + best_agent_id = None + best_dist = float('inf') + for agent_id, agent_state in existing_agents.items(): + old_center = np.array([agent_state.pose.x, agent_state.pose.y, agent_state.pose.z]) + dist = np.linalg.norm(new_center - old_center) + if dist < distance_threshold and dist < best_dist: + best_dist = dist + best_agent_id = agent_id + return best_agent_id + + +def compute_velocity(old_pose: ObjectPose, new_pose: ObjectPose, dt: float) -> tuple: + """ + Compute the (vx, vy, vz) velocity based on change in pose over time. + """ + if dt <= 0: + return (0, 0, 0) + vx = (new_pose.x - old_pose.x) / dt + vy = (new_pose.y - old_pose.y) / dt + vz = (new_pose.z - old_pose.z) / dt + return (vx, vy, vz) + + +def extract_roi_box(lidar_pc, center, half_extents): + """ + Extract a region of interest (ROI) from the LiDAR point cloud defined by an axis-aligned bounding box. + """ + lower = center - half_extents + upper = center + half_extents + mask = np.all((lidar_pc >= lower) & (lidar_pc <= upper), axis=1) + return lidar_pc[mask] + + +def pc2_to_numpy(pc2_msg, want_rgb=False): + """ + Convert a ROS PointCloud2 message into a numpy array quickly using ros_numpy. + This function extracts the x, y, z coordinates from the point cloud. + """ + # Convert the ROS message to a numpy structured array + pc = ros_numpy.point_cloud2.pointcloud2_to_array(pc2_msg) + # Stack x,y,z fields to a (N,3) array + pts = np.stack((np.array(pc['x']).ravel(), + np.array(pc['y']).ravel(), + np.array(pc['z']).ravel()), axis=1) + # Apply filtering (for example, x > 0 and z in a specified range) + mask = (pts[:, 0] > -0.5) & (pts[:, 2] < -1) & (pts[:, 2] > -2.7) + return pts[mask] + + +def backproject_pixel(u, v, K): + """ + Backprojects a pixel coordinate (u, v) into a normalized 3D ray in the camera coordinate system. + """ + cx, cy = K[0, 2], K[1, 2] + fx, fy = K[0, 0], K[1, 1] + x = (u - cx) / fx + y = (v - cy) / fy + ray_dir = np.array([x, y, 1.0]) + return ray_dir / np.linalg.norm(ray_dir) + + +def find_human_center_on_ray(lidar_pc, ray_origin, ray_direction, + t_min, t_max, t_step, + distance_threshold, min_points, ransac_threshold): + """ + Identify the center of a human along a projected ray. + (This function is no longer used in the new approach.) + """ + return None, None, None + + +def extract_roi(pc, center, roi_radius): + """ + Extract points from a point cloud that lie within a specified radius of a center point. + """ + distances = np.linalg.norm(pc - center, axis=1) + return pc[distances < roi_radius] + + +def refine_cluster(roi_points, center, eps=0.2, min_samples=10): + """ + Refine a point cluster by applying DBSCAN and return the cluster closest to 'center'. + """ + if roi_points.shape[0] < min_samples: + return roi_points + clustering = DBSCAN(eps=eps, min_samples=min_samples).fit(roi_points) + labels = clustering.labels_ + valid_clusters = [roi_points[labels == l] for l in set(labels) if l != -1] + if not valid_clusters: + return roi_points + best_cluster = min(valid_clusters, key=lambda c: np.linalg.norm(np.mean(c, axis=0) - center)) + return best_cluster + + +def remove_ground_by_min_range(cluster, z_range=0.05): + """ + Remove points within z_range of the minimum z (assumed to be ground). + """ + if cluster is None or cluster.shape[0] == 0: + return cluster + min_z = np.min(cluster[:, 2]) + filtered = cluster[cluster[:, 2] > (min_z + z_range)] + return filtered + + +def get_bounding_box_center_and_dimensions(points): + """ + Calculate the axis-aligned bounding box's center and dimensions for a set of 3D points. + """ + if points.shape[0] == 0: + return None, None + min_vals = np.min(points, axis=0) + max_vals = np.max(points, axis=0) + center = (min_vals + max_vals) / 2 + dimensions = max_vals - min_vals + return center, dimensions + + +def create_ray_line_set(start, end): + """ + Create an Open3D LineSet object representing a ray between two 3D points. + The line is colored yellow. + """ + points = [start, end] + lines = [[0, 1]] + line_set = o3d.geometry.LineSet() + line_set.points = o3d.utility.Vector3dVector(points) + line_set.lines = o3d.utility.Vector2iVector(lines) + line_set.colors = o3d.utility.Vector3dVector([[1, 1, 0]]) + return line_set + + +def downsample_points(lidar_points, voxel_size=0.15): + pcd = o3d.geometry.PointCloud() + pcd.points = o3d.utility.Vector3dVector(lidar_points) + down_pcd = pcd.voxel_down_sample(voxel_size=voxel_size) + return np.asarray(down_pcd.points) + + +def filter_depth_points(lidar_points, max_depth_diff=0.9, use_norm=True): + if lidar_points.shape[0] == 0: + return lidar_points + + if use_norm: + depths = np.linalg.norm(lidar_points, axis=1) + else: + depths = lidar_points[:, 0] + + min_depth = np.min(depths) + max_possible_depth = min_depth + max_depth_diff + mask = depths < max_possible_depth + return lidar_points[mask] + + +def visualize_geometries(geometries, window_name="Open3D", width=800, height=600, point_size=5.0): + """ + Visualize a list of Open3D geometry objects in a dedicated window. + """ + vis = o3d.visualization.Visualizer() + vis.create_window(window_name=window_name, width=width, height=height) + for geom in geometries: + vis.add_geometry(geom) + opt = vis.get_render_option() + opt.point_size = point_size + vis.run() + vis.destroy_window() + + +def pose_to_matrix(pose): + """ + Compose a 4x4 transformation matrix from a pose state. + Assumes pose has attributes: x, y, z, yaw, pitch, roll, + where the angles are given in degrees. + """ + x = pose.x if pose.x is not None else 0.0 + y = pose.y if pose.y is not None else 0.0 + z = pose.z if pose.z is not None else 0.0 + if pose.yaw is not None and pose.pitch is not None and pose.roll is not None: + yaw = math.radians(pose.yaw) + pitch = math.radians(pose.pitch) + roll = math.radians(pose.roll) + else: + yaw = 0.0 + pitch = 0.0 + roll = 0.0 + R_mat = R.from_euler('zyx', [yaw, pitch, roll]).as_matrix() + T = np.eye(4) + T[:3, :3] = R_mat + T[:3, 3] = np.array([x, y, z]) + return T + + +def transform_points_l2c(lidar_points, T_l2c): + N = lidar_points.shape[0] + pts_hom = np.hstack((lidar_points, np.ones((N, 1)))) # (N,4) + pts_cam = (T_l2c @ pts_hom.T).T # (N,4) + return pts_cam[:, :3] + + +# ----- New: Vectorized projection function ----- +def project_points(pts_cam, K, original_lidar_points): + """ + Vectorized version. + pts_cam: (N,3) array of points in camera coordinates. + original_lidar_points: (N,3) array of points in LiDAR coordinates. + Returns a (M,5) array: [u, v, X_lidar, Y_lidar, Z_lidar] for all points with Z>0. + """ + mask = pts_cam[:, 2] > 0 + pts_cam_valid = pts_cam[mask] + lidar_valid = original_lidar_points[mask] + Xc = pts_cam_valid[:, 0] + Yc = pts_cam_valid[:, 1] + Zc = pts_cam_valid[:, 2] + u = (K[0, 0] * (Xc / Zc) + K[0, 2]).astype(np.int32) + v = (K[1, 1] * (Yc / Zc) + K[1, 2]).astype(np.int32) + proj = np.column_stack((u, v, lidar_valid)) + return proj + + +class ConeDetector3D(Component): + """ + Detects cones by fusing YOLO 2D detections with LiDAR point cloud data. + + Tracking is optional: set `enable_tracking=False` to disable persistent tracking + and return only detections from the current frame. + """ + + def __init__(self, vehicle_interface: GEMInterface): + self.vehicle_interface = vehicle_interface + self.enable_tracking = False + self.current_agents = {} + self.tracked_agents = {} + self.cone_counter = 0 + self.latest_image = None + self.latest_lidar = None + self.bridge = CvBridge() + self.start_pose_abs = None + self.camera_front = True + self.visualize_2d = False + self.use_cyl_roi = False + self.start_time = None + self.use_start_frame = False + self.save_data = False + self.orientation = False + + def rate(self) -> float: + return 4.0 + + def state_inputs(self) -> list: + return ['vehicle'] + + def state_outputs(self) -> list: + return ['agents'] + + def initialize(self): + self.rgb_sub = Subscriber('/oak/rgb/image_raw', Image) + self.lidar_sub = Subscriber('/ouster/points', PointCloud2) + self.sync = ApproximateTimeSynchronizer([self.rgb_sub, self.lidar_sub], + queue_size=10, slop=0.1) + self.sync.registerCallback(self.synchronized_callback) + self.detector = YOLO('/home/gem/s2025_perception_merge/GEMstack/GEMstack/knowledge/detection/cone.pt') + self.detector.to('cuda') + + if self.camera_front: + self.K = np.array([[684.83331299, 0., 573.37109375], + [0., 684.60968018, 363.70092773], + [0., 0., 1.]]) + else: + self.K = np.array([[1.17625545e+03, 0.00000000e+00, 9.66432645e+02], + [0.00000000e+00, 1.17514569e+03, 6.08580326e+02], + [0.00000000e+00, 0.00000000e+00, 1.00000000e+00]]) + + if self.camera_front: + self.D = np.array([0.0, 0.0, 0.0, 0.0, 0.0]) + else: + self.D = np.array([-2.70136325e-01, 1.64393255e-01, -1.60720782e-03, -7.41246708e-05, + -6.19939758e-02]) + + self.T_l2v = np.array([[0.99939639, 0.02547917, 0.023615, 1.1], + [-0.02530848, 0.99965156, -0.00749882, 0.03773583], + [-0.02379784, 0.00689664, 0.999693, 1.95320223], + [0., 0., 0., 1.]]) + if self.camera_front: + self.T_l2c = np.array([ + [0.001090, -0.999489, -0.031941, 0.149698], + [-0.007664, 0.031932, -0.999461, -0.397813], + [0.999970, 0.001334, -0.007625, -0.691405], + [0., 0., 0., 1.000000] + ]) + else: + self.T_l2c = np.array([[-0.71836368, -0.69527204, -0.02346088, 0.05718003], + [-0.09720448, 0.13371206, -0.98624154, -0.1598301], + [0.68884317, -0.7061996, -0.16363744, -1.04767285], + [0., 0., 0., 1.]] + ) + self.T_c2l = np.linalg.inv(self.T_l2c) + self.R_c2l = self.T_c2l[:3, :3] + self.camera_origin_in_lidar = self.T_c2l[:3, 3] + + def synchronized_callback(self, image_msg, lidar_msg): + step1 = time.time() + try: + self.latest_image = self.bridge.imgmsg_to_cv2(image_msg, "bgr8") + except Exception as e: + rospy.logerr("Failed to convert image: {}".format(e)) + self.latest_image = None + step2 = time.time() + self.latest_lidar = pc2_to_numpy(lidar_msg, want_rgb=False) + step3 = time.time() + print('image callback: ', step2 - step1, 'lidar callback ', step3 - step2) + + def update(self, vehicle: VehicleState) -> Dict[str, AgentState]: + downsample = False + if self.latest_image is None or self.latest_lidar is None: + return {} + + # Ensure data/ exists and build timestamp + os.makedirs("data", exist_ok=True) + current_time = self.vehicle_interface.time() + if self.start_time is None: + self.start_time = current_time + time_elapsed = current_time - self.start_time + + if self.save_data: + tstamp = int(self.vehicle_interface.time() * 1000) + # 1) Dump raw image + cv2.imwrite(f"data/{tstamp}_image.png", self.latest_image) + # 2) Dump raw LiDAR + np.savez(f"data/{tstamp}_lidar.npz", lidar=self.latest_lidar) + # 3) Write BEFORE_TRANSFORM + with open(f"data/{tstamp}_vehstate.txt", "w") as f: + vp = vehicle.pose + f.write( + f"BEFORE_TRANSFORM " + f"x={vp.x:.3f}, y={vp.y:.3f}, z={vp.z:.3f}, " + f"yaw={vp.yaw:.2f}, pitch={vp.pitch:.2f}, roll={vp.roll:.2f}\n" + ) + # Compute vehicle_start_pose in either START or CURRENT + if self.use_start_frame: + if self.start_pose_abs is None: + self.start_pose_abs = vehicle.pose + vehicle_start_pose = vehicle.pose.to_frame( + ObjectFrameEnum.START, + vehicle.pose, + self.start_pose_abs + ) + mode = "START" + else: + vehicle_start_pose = vehicle.pose + mode = "CURRENT" + with open(f"data/{tstamp}_vehstate.txt", "a") as f: + f.write( + f"AFTER_TRANSFORM " + f"x={vehicle_start_pose.x:.3f}, " + f"y={vehicle_start_pose.y:.3f}, " + f"z={vehicle_start_pose.z:.3f}, " + f"yaw={vehicle_start_pose.yaw:.2f}, " + f"pitch={vehicle_start_pose.pitch:.2f}, " + f"roll={vehicle_start_pose.roll:.2f}, " + f"frame={mode}\n" + ) + + undistorted_img, current_K = undistort_image(self.latest_image, self.K, self.D) + self.current_K = current_K + self.latest_image = undistorted_img + orig_H, orig_W = undistorted_img.shape[:2] + + # --- Begin modifications for three-angle detection --- + img_normal = undistorted_img + results_normal = self.detector(img_normal, conf=0.3, classes=[0]) + combined_boxes = [] + if self.orientation: + img_left = cv2.rotate(undistorted_img.copy(), cv2.ROTATE_90_COUNTERCLOCKWISE) + img_right = cv2.rotate(undistorted_img.copy(), cv2.ROTATE_90_CLOCKWISE) + results_left = self.detector(img_left, conf=0.3, classes=[0]) + results_right = self.detector(img_right, conf=0.3, classes=[0]) + boxes_left = np.array(results_left[0].boxes.xywh.cpu()) if len(results_left) > 0 else [] + boxes_right = np.array(results_right[0].boxes.xywh.cpu()) if len(results_right) > 0 else [] + for box in boxes_left: + cx, cy, w, h = box + new_cx = cy + new_cy = orig_W - 1 - cx + combined_boxes.append((new_cx, new_cy, h, w, AgentActivityEnum.RIGHT)) + for box in boxes_right: + cx, cy, w, h = box + new_cx = orig_H - 1 - cy + new_cy = cx + combined_boxes.append((new_cx, new_cy, h, w, AgentActivityEnum.LEFT)) + + boxes_normal = np.array(results_normal[0].boxes.xywh.cpu()) if len(results_normal) > 0 else [] + for box in boxes_normal: + cx, cy, w, h = box + combined_boxes.append((cx, cy, w, h, AgentActivityEnum.STANDING)) + + if getattr(self, 'visualize_2d', False): + for (cx, cy, w, h, activity) in combined_boxes: + left = int(cx - w / 2) + right = int(cx + w / 2) + top = int(cy - h / 2) + bottom = int(cy + h / 2) + if activity == AgentActivityEnum.STANDING: + color = (255, 0, 0) + label = "STANDING" + elif activity == AgentActivityEnum.RIGHT: + color = (0, 255, 0) + label = "RIGHT" + elif activity == AgentActivityEnum.LEFT: + color = (0, 0, 255) + label = "LEFT" + else: + color = (255, 255, 255) + label = "UNKNOWN" + cv2.rectangle(undistorted_img, (left, top), (right, bottom), color, 2) + cv2.putText(undistorted_img, label, (left, max(top - 5, 20)), + cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2) + cv2.imshow("Detection - Cone 2D", undistorted_img) + + if downsample: + lidar_down = downsample_points(self.latest_lidar, voxel_size=0.1) + else: + lidar_down = self.latest_lidar.copy() + + pts_cam = transform_points_l2c(lidar_down, self.T_l2c) + projected_pts = project_points(pts_cam, self.current_K, lidar_down) + + agents = {} + + for i, box_info in enumerate(combined_boxes): + cx, cy, w, h, activity = box_info + start = time.time() + left = int(cx - w / 2) + right = int(cx + w / 2) + top = int(cy - h / 2) + bottom = int(cy + h / 2) + mask = (projected_pts[:, 0] >= left) & (projected_pts[:, 0] <= right) & \ + (projected_pts[:, 1] >= top) & (projected_pts[:, 1] <= bottom) + roi_pts = projected_pts[mask] + if roi_pts.shape[0] < 5: + continue + points_3d = roi_pts[:, 2:5] + points_3d = filter_points_within_threshold(points_3d, 30) + points_3d = filter_depth_points(points_3d, max_depth_diff=0.3) + + if self.use_cyl_roi: + global_filtered = filter_points_within_threshold(lidar_down, 30) + roi_cyl = cylindrical_roi(global_filtered, np.mean(points_3d, axis=0), radius=0.3, height=1.2) + refined_cluster = remove_ground_by_min_range(roi_cyl, z_range=0.01) + refined_cluster = filter_depth_points(refined_cluster, max_depth_diff=0.2) + else: + refined_cluster = remove_ground_by_min_range(points_3d, z_range=0.05) + end1 = time.time() + if refined_cluster.shape[0] < 4: + continue + + pcd = o3d.geometry.PointCloud() + pcd.points = o3d.utility.Vector3dVector(refined_cluster) + obb = pcd.get_oriented_bounding_box() + refined_center = obb.center + dims = tuple(obb.extent) + R_lidar = obb.R.copy() + end2 = time.time() + + refined_center_hom = np.append(refined_center, 1) + refined_center_vehicle_hom = self.T_l2v @ refined_center_hom + refined_center_vehicle = refined_center_vehicle_hom[:3] + + R_vehicle = self.T_l2v[:3, :3] @ R_lidar + yaw, pitch, roll = R.from_matrix(R_vehicle).as_euler('zyx', degrees=False) + refined_center = refined_center_vehicle + + if self.use_start_frame: + if self.start_pose_abs is None: + self.start_pose_abs = vehicle.pose + vehicle_start_pose = vehicle.pose.to_frame( + ObjectFrameEnum.START, + vehicle.pose, + self.start_pose_abs + ) + T_vehicle_to_start = pose_to_matrix(vehicle_start_pose) + xp, yp, zp = (T_vehicle_to_start @ np.append(refined_center, 1))[:3] + out_frame = ObjectFrameEnum.START + else: + xp, yp, zp = refined_center + out_frame = ObjectFrameEnum.CURRENT + + new_pose = ObjectPose( + t=current_time, + x=xp, y=yp, z=zp, + yaw=yaw, pitch=pitch, roll=roll, + frame=out_frame + ) + + # --- Optional tracking --- + if self.enable_tracking: + existing_id = match_existing_cone( + np.array([new_pose.x, new_pose.y, new_pose.z]), + dims, + self.tracked_agents, + distance_threshold=2.0 + ) + if existing_id is not None: + old_state = self.tracked_agents[existing_id] + if vehicle.v < 0.1: + alpha = 0.1 + avg_x = alpha * new_pose.x + (1 - alpha) * old_state.pose.x + avg_y = alpha * new_pose.y + (1 - alpha) * old_state.pose.y + avg_z = alpha * new_pose.z + (1 - alpha) * old_state.pose.z + avg_yaw = alpha * new_pose.yaw + (1 - alpha) * old_state.pose.yaw + avg_pitch = alpha * new_pose.pitch + (1 - alpha) * old_state.pose.pitch + avg_roll = alpha * new_pose.roll + (1 - alpha) * old_state.pose.roll + + updated_pose = ObjectPose( + t=new_pose.t, + x=avg_x, + y=avg_y, + z=avg_z, + yaw=avg_yaw, + pitch=avg_pitch, + roll=avg_roll, + frame=new_pose.frame + ) + updated_agent = AgentState( + pose=updated_pose, + dimensions=dims, + outline=None, + type=AgentEnum.CONE, + activity=activity, + velocity=(0, 0, 0), + yaw_rate=0 + ) + else: + updated_agent = old_state + agents[existing_id] = updated_agent + self.tracked_agents[existing_id] = updated_agent + else: + agent_id = f"Cone{self.cone_counter}" + self.cone_counter += 1 + new_agent = AgentState( + pose=new_pose, + dimensions=dims, + outline=None, + type=AgentEnum.CONE, + activity=activity, + velocity=(0, 0, 0), + yaw_rate=0 + ) + agents[agent_id] = new_agent + self.tracked_agents[agent_id] = new_agent + else: + agent_id = f"Cone{self.cone_counter}" + self.cone_counter += 1 + new_agent = AgentState( + pose=new_pose, + dimensions=dims, + outline=None, + type=AgentEnum.CONE, + activity=activity, + velocity=(0, 0, 0), + yaw_rate=0 + ) + agents[agent_id] = new_agent + + self.current_agents = agents + + # If tracking not enabled, return only current frame detections + if not self.enable_tracking: + return self.current_agents + + stale_ids = [agent_id for agent_id, agent in self.tracked_agents.items() + if current_time - agent.pose.t > 5.0] + for agent_id in stale_ids: + rospy.loginfo(f"Removing stale agent: {agent_id}\n") + del self.tracked_agents[agent_id] + + return self.tracked_agents + + # ----- Fake Cone Detector 2D (for Testing Purposes) ----- + +class FakConeDetector(Component): + def __init__(self, vehicle_interface: GEMInterface): + self.vehicle_interface = vehicle_interface + self.times = [(5.0, 20.0), (30.0, 35.0)] + self.t_start = None + + def rate(self): + return 4.0 + + def state_inputs(self): + return ['vehicle'] + + def state_outputs(self): + return ['agents'] + + def update(self, vehicle: VehicleState) -> Dict[str, AgentState]: + if self.t_start is None: + self.t_start = self.vehicle_interface.time() + t = self.vehicle_interface.time() - self.t_start + res = {} + for time_range in self.times: + if t >= time_range[0] and t <= time_range[1]: + res['cone0'] = box_to_fake_agent((0, 0, 0, 0)) + rospy.loginfo("Detected a Cone (simulated)") + return res + +def box_to_fake_agent(box): + x, y, w, h = box + pose = ObjectPose(t=0, x=x + w / 2, y=y + h / 2, z=0, yaw=0, pitch=0, roll=0, frame=ObjectFrameEnum.CURRENT) + dims = (w, h, 0) + return AgentState(pose=pose, dimensions=dims, outline=None, + type=AgentEnum.CONE, activity=AgentActivityEnum.MOVING, + velocity=(0, 0, 0), yaw_rate=0) + +if __name__ == '__main__': + pass diff --git a/GEMstack/onboard/perception/sensorFusion/eval_3d_bbox_performance.py b/GEMstack/onboard/perception/sensorFusion/eval_3d_bbox_performance.py new file mode 100644 index 000000000..58b14cb92 --- /dev/null +++ b/GEMstack/onboard/perception/sensorFusion/eval_3d_bbox_performance.py @@ -0,0 +1,683 @@ +import argparse +import os +import numpy as np +import matplotlib +matplotlib.use('Agg') +import matplotlib.pyplot as plt +from collections import defaultdict +import math +import warnings +import traceback + +###### Bbox coord mapping ###### +IDX_X, IDX_Y, IDX_Z = 0, 1, 2 +IDX_L, IDX_W, IDX_H = 3, 4, 5 +IDX_YAW = 6 +IDX_CLASS = 7 +IDX_SCORE = 8 + +###### Helper Functions ###### + +def parse_box_line(line, is_gt=False): + """ + Parses a line from a KITTI format label file. + Handles the conversion from KITTI's y-coordinate (bottom of box) + to the geometric center y used internally. + """ + parts = line.strip().split() + try: + # Parses standard KITTI format: + # type truncated occluded alpha bbox_2d(4) dims(3) loc(3) rotation_y (score) + # 0 1 2 3 4-7 8-10 11-13 14 (15) + + # Check minimum length for standard KITTI format (15 fields without score) + if len(parts) < 15: + raise ValueError(f"Line does not have enough parts for KITTI format (expected >= 15, got {len(parts)})") + + class_label = parts[0] + + # Read dimensions (h, w, l) from KITTI standard indices 8, 9, 10 + h = float(parts[8]) + w = float(parts[9]) + l = float(parts[10]) + + # Read location (x, y_bottom, z) from KITTI standard indices 11, 12, 13 + loc_x = float(parts[11]) + loc_y_bottom = float(parts[12]) # KITTI 'y' is bottom of the box in camera coords + loc_z = float(parts[13]) + + # Read rotation_y (yaw) from KITTI standard index 14 + rot_y = float(parts[14]) + + # Convert to internal format + # Internal format: [cx, cy, cz, l, w, h, yaw, class_label, score] + + # Use KITTI loc_x and loc_z directly for internal cx, cz + cx = loc_x + cz = loc_z + + # Convert KITTI y (bottom of box) to geometric center y for internal use. + # In KITTI camera coords, Y points down, so center is 'above' bottom (numerically smaller y): loc_y - h / 2 + cy = loc_y_bottom - (h / 2.0) + + # Use KITTI rot_y directly for internal yaw + yaw = rot_y + + # Create the standardized box list [cx, cy, cz, l, w, h, yaw, class_label] + # Note the internal dimension order (l, w, h) differs from KITTI input order (h, w, l) + box = [cx, cy, cz, l, w, h, yaw, class_label] + + # Append score if it's a prediction file (standard KITTI index 15) + if not is_gt: + if len(parts) > 15: + score = float(parts[15]) + else: + # Handle missing score in prediction file if needed + score = 1.0 # Assigning default 1.0 as before + warnings.warn(f"[LOG] Prediction line missing score (field 16), assigning default 1.0: {line.strip()}") + box.append(score) # Append score using IDX_SCORE implicitly + + return box + + except ValueError as ve: + print(f"[LOG] Error parsing line: '{line.strip()}' - {ve}") + print("[LOG] Does input follows KITTI format ?") + return None + except Exception as e: + print(f"[LOG] Unexpected error parsing line: '{line.strip()}' - {e}") + traceback.print_exc() # Print detailed traceback for unexpected errors + return None + + +def load_labels(file_path, is_gt=False): + """Loads ground truth or prediction boxes from a KITTI-format file.""" + boxes = defaultdict(list) + if not os.path.exists(file_path): + # Reduced verbosity for missing prediction files (common) + # print(f"[LOG] File not found {file_path}, returning empty dict.") + return boxes + try: + with open(file_path, 'r') as f: + for i, line in enumerate(f): + box = parse_box_line(line, is_gt=is_gt) + if box: + class_label = box[IDX_CLASS] + boxes[class_label].append(box) + else: + print(f"[LOG] Skipping invalid line {i+1} in {file_path}") + except Exception as e: + print(f"[LOG] Error reading file {file_path}: {e}") + traceback.print_exc() + return boxes + +def calculate_3d_iou(box1, box2): + """ + Calculates the 3D Intersection over Union (IoU) between two bounding boxes. + + Args: + box1, box2: List or tuple representing a 3D bounding box in the + *internal standardized format*: [cx, cy, cz, l, w, h, yaw, ...] + where cy is the geometric center y. + + Returns: + float: The 3D IoU value. + + Doesn't consider yaw + """ + + ####### Simple Axis-Aligned Bounding Box (AABB) IoU ####### + def get_aabb_corners(box): + # Uses internal format where cy is geometric center + cx, cy, cz, l, w, h = box[IDX_X], box[IDX_Y], box[IDX_Z], box[IDX_L], box[IDX_W], box[IDX_H] + # Ignores yaw + min_x, max_x = cx - l / 2, cx + l / 2 + min_y, max_y = cy - h / 2, cy + h / 2 # Uses geometric center cy + min_z, max_z = cz - w / 2, cz + w / 2 # Assuming cz is center z (depth) + return min_x, max_x, min_y, max_y, min_z, max_z + + min_x1, max_x1, min_y1, max_y1, min_z1, max_z1 = get_aabb_corners(box1) + min_x2, max_x2, min_y2, max_y2, min_z2, max_z2 = get_aabb_corners(box2) + + # Calculate intersection volume + inter_min_x = max(min_x1, min_x2) + inter_max_x = min(max_x1, max_x2) + inter_min_y = max(min_y1, min_y2) + inter_max_y = min(max_y1, max_y2) + inter_min_z = max(min_z1, min_z2) + inter_max_z = min(max_z1, max_z2) + + inter_l = max(0, inter_max_x - inter_min_x) + inter_h = max(0, inter_max_y - inter_min_y) + inter_w = max(0, inter_max_z - inter_min_z) + intersection_volume = inter_l * inter_h * inter_w + + # Calculate union volume + vol1 = box1[IDX_L] * box1[IDX_W] * box1[IDX_H] + vol2 = box2[IDX_L] * box2[IDX_W] * box2[IDX_H] + # Ensure volumes are positive and non-zero for stable IoU + vol1 = max(vol1, 1e-6) + vol2 = max(vol2, 1e-6) + union_volume = vol1 + vol2 - intersection_volume + union_volume = max(union_volume, 1e-6) # Avoid division by zero or very small numbers + + iou = intersection_volume / union_volume + # Clamp IoU to [0, 1] range + iou = max(0.0, min(iou, 1.0)) + return iou + + +def calculate_ap(precision, recall): + """Calculates Average Precision using the PASCAL VOC method (area under monotonic PR curve).""" + # Convert to numpy arrays first for safety + if not isinstance(precision, (list, np.ndarray)) or not isinstance(recall, (list, np.ndarray)): + return 0.0 + precision = np.array(precision) + recall = np.array(recall) + + if precision.size == 0 or recall.size == 0: + return 0.0 + + # Prepend/Append points for interpolation boundaries + recall = np.concatenate(([0.], recall, [1.0])) + precision = np.concatenate(([0.], precision, [0.])) # Start with 0 precision at recall 0, end with 0 at recall 1 + + # Make precision monotonically decreasing (handles PR curve 'jiggles') + for i in range(len(precision) - 2, -1, -1): + precision[i] = max(precision[i], precision[i+1]) + + # Find indices where recall changes (avoids redundant calculations) + indices = np.where(recall[1:] != recall[:-1])[0] + 1 + + # Compute AP using the area under the curve (sum of rectangle areas) + ap = np.sum((recall[indices] - recall[indices-1]) * precision[indices]) + return ap + +def evaluate_detector(gt_boxes_all_samples, pred_boxes_all_samples, classes, iou_threshold): + """Evaluates a single detector's predictions against ground truth.""" + results_by_class = {} + sample_ids = list(gt_boxes_all_samples.keys()) # Get fixed order of sample IDs + + for cls in classes: + all_pred_boxes_cls = [] + num_gt_cls = 0 + pred_sample_indices = [] # Store index from sample_ids for each prediction + + # Collect all GTs and Preds for this class across samples + for i, sample_id in enumerate(sample_ids): + # Use .get() with default empty dict/list for safety + gt_boxes = gt_boxes_all_samples.get(sample_id, {}).get(cls, []) + pred_boxes = pred_boxes_all_samples.get(sample_id, {}).get(cls, []) + + num_gt_cls += len(gt_boxes) + for box in pred_boxes: + all_pred_boxes_cls.append(box) + pred_sample_indices.append(i) # Store the original sample index + + if not all_pred_boxes_cls: # Handle case with no predictions for this class + results_by_class[cls] = { + 'precision': np.array([]), # Use empty numpy arrays + 'recall': np.array([]), + 'ap': 0.0, + 'num_gt': num_gt_cls, + 'num_pred': 0 + } + continue # Skip to next class + + # Sort detections by confidence score (descending) + # Ensure scores exist and are numeric before sorting + scores = [] + valid_indices_for_sorting = [] + for idx, box in enumerate(all_pred_boxes_cls): + if len(box) > IDX_SCORE and isinstance(box[IDX_SCORE], (int, float)): + scores.append(-box[IDX_SCORE]) # Use negative score for descending sort with argsort + valid_indices_for_sorting.append(idx) + else: + warnings.warn(f"Class {cls}: Prediction missing score or invalid score type. Excluding from evaluation: {box}") + + if not valid_indices_for_sorting: # If filtering removed all boxes + results_by_class[cls] = {'precision': np.array([]),'recall': np.array([]),'ap': 0.0,'num_gt': num_gt_cls,'num_pred': 0} + continue + + # Filter lists based on valid scores + all_pred_boxes_cls = [all_pred_boxes_cls[i] for i in valid_indices_for_sorting] + pred_sample_indices = [pred_sample_indices[i] for i in valid_indices_for_sorting] + # Scores list is already built correctly + + # Get the sorted order based on scores + sorted_indices = np.argsort(scores) # argsort sorts ascending on negative scores -> descending order of original scores + + # Reorder the lists based on sorted scores + all_pred_boxes_cls = [all_pred_boxes_cls[i] for i in sorted_indices] + pred_sample_indices = [pred_sample_indices[i] for i in sorted_indices] + + + tp = np.zeros(len(all_pred_boxes_cls)) + fp = np.zeros(len(all_pred_boxes_cls)) + # Track matched GTs per sample: gt_matched[sample_idx][gt_box_idx] = True/False + gt_matched = defaultdict(lambda: defaultdict(bool)) # Indexed by sample_idx, then gt_idx + + # Match predictions + for det_idx, pred_box in enumerate(all_pred_boxes_cls): + sample_idx = pred_sample_indices[det_idx] # Get the original sample index (0 to num_samples-1) + sample_id = sample_ids[sample_idx] # Get the sample_id string using the index + gt_boxes = gt_boxes_all_samples.get(sample_id, {}).get(cls, []) + + best_iou = -1.0 + best_gt_idx = -1 # Index relative to gt_boxes for this sample/class + + if not gt_boxes: # No GT for this class in this specific sample + fp[det_idx] = 1 + continue + + for gt_idx, gt_box in enumerate(gt_boxes): + # Explicitly check class match (belt-and-suspenders) + if pred_box[IDX_CLASS] == gt_box[IDX_CLASS]: + iou = calculate_3d_iou(pred_box, gt_box) + if iou > best_iou: + best_iou = iou + best_gt_idx = gt_idx + # else: # Should not happen if inputs are correctly filtered by class + # pass + + + if best_iou >= iou_threshold: + # Check if this GT box was already matched *in this sample* + if not gt_matched[sample_idx].get(best_gt_idx, False): + tp[det_idx] = 1 + gt_matched[sample_idx][best_gt_idx] = True # Mark as matched for this sample + else: + fp[det_idx] = 1 # Matched a GT box already covered by a higher-scored prediction + else: + fp[det_idx] = 1 # Did not match any available GT box with sufficient IoU + + # Calculate precision/recall + fp_cumsum = np.cumsum(fp) + tp_cumsum = np.cumsum(tp) + + # Avoid division by zero if num_gt_cls is 0 + recall = tp_cumsum / num_gt_cls if num_gt_cls > 0 else np.zeros_like(tp_cumsum, dtype=float) + + # Avoid division by zero if no predictions were made or matched (tp + fp = 0) + denominator = tp_cumsum + fp_cumsum + precision = np.divide(tp_cumsum, denominator, out=np.zeros_like(tp_cumsum, dtype=float), where=denominator!=0) + + + ap = calculate_ap(precision, recall) + + results_by_class[cls] = { + 'precision': precision, # Store as numpy arrays + 'recall': recall, + 'ap': ap, + 'num_gt': num_gt_cls, + 'num_pred': len(all_pred_boxes_cls) # Number of predictions *with valid scores* + } + + return results_by_class + + +def plot_pr_curves(results_all_detectors, classes, output_dir): + """Plots Precision-Recall curves for each class.""" + if not os.path.exists(output_dir): + try: + os.makedirs(output_dir) + except OSError as e: + print(f"[LOG] Error creating output directory {output_dir} for plots: {e}") + return # Cannot save plots + + detector_names = list(results_all_detectors.keys()) + + for cls in classes: + plt.figure(figsize=(10, 7)) + any_results_for_class = False # Track if any detector had results for this class + + for detector_name, results_by_class in results_all_detectors.items(): + if cls in results_by_class and results_by_class[cls]['num_pred'] > 0 : # Check if there were predictions + res = results_by_class[cls] + precision = res['precision'] + recall = res['recall'] + ap = res['ap'] + + # Ensure plotting works even if precision/recall are empty arrays + if recall.size > 0 and precision.size > 0: + # Prepend a point for plotting nicely from recall=0 + plot_recall = np.concatenate(([0.], recall)) + # Use precision[0] if available, else 0. + plot_precision = np.concatenate(([precision[0] if precision.size > 0 else 0.], precision)) + plt.plot(plot_recall, plot_precision, marker='.', markersize=4, linestyle='-', label=f'{detector_name} (AP={ap:.3f})') + any_results_for_class = True + else: # Handle case where num_pred > 0 but P/R arrays somehow ended up empty + plt.plot([0], [0], marker='s', markersize=5, linestyle='', label=f'{detector_name} (AP={ap:.3f}, No P/R data?)') + any_results_for_class = True # Still mark as having results + + + elif cls in results_by_class: # Class exists in evaluation, but no predictions were made for it + num_gt = results_by_class[cls]['num_gt'] + if num_gt > 0: + # Plot a marker indicating no predictions were made for this GT class + plt.plot([0], [0], marker='x', markersize=6, linestyle='', label=f'{detector_name} (No Pred, GT={num_gt})') + # else: # No GT and no predictions for this class, don't plot anything specific + # pass + # else: # Class not even in results dict for this detector (e.g., error during eval?) + # Could happen if detector had no files or all files failed parsing for this class + # Might indicate an issue, but avoid cluttering plot unless needed. + pass + + + if any_results_for_class: + plt.xlabel('Recall') + plt.ylabel('Precision') + plt.title(f'Precision-Recall Curve for Class: {cls}') + plt.legend(loc='lower left') + plt.grid(True) + plt.xlim([-0.05, 1.05]) + plt.ylim([-0.05, 1.05]) + plot_path = os.path.join(output_dir, f'pr_curve_{cls}.png') + try: + plt.savefig(plot_path) + print(f"[LOG] Generated PR curve: {plot_path}") + except Exception as e: + print(f"[LOG] Error saving PR curve plot for class '{cls}': {e}") + finally: + plt.close() # Close the figure regardless of save success + else: + # Check if there was any GT data for this class across all detectors + # Use .get() chain safely + num_gt_total = sum(results_by_class.get(cls, {}).get('num_gt', 0) for results_by_class in results_all_detectors.values()) + if num_gt_total > 0: + print(f" Skipping PR plot for class '{cls}': No predictions found across detectors (GT={num_gt_total}).") + else: + print(f" Skipping PR plot for class '{cls}': No ground truth found.") + plt.close() # Close the empty figure + +def main(): + parser = argparse.ArgumentParser(description='Evaluate N 3D Object Detectors using KITTI format labels.') + parser.add_argument('gt_dir', type=str, help='Directory containing ground truth label files (KITTI format).') + parser.add_argument('pred_dirs', type=str, nargs='+', help='List of directories containing prediction files (KITTI format).') + parser.add_argument('output_dir', type=str, help='Directory to save evaluation results.') + parser.add_argument('--detector_names', type=str, nargs='*', help='Optional names for the detectors (corresponding to pred_dirs). If not provided, uses directory names.') + parser.add_argument('--iou_threshold', type=float, default=0.5, help='3D IoU threshold for matching (default: 0.5). KITTI examples: 0.5 (Car Easy/Mod), 0.7 (Car Hard), 0.5 (Ped/Cyc Easy/Mod), 0.5 (Ped/Cyc Hard).') + parser.add_argument('--classes', type=str, nargs='*', default=['Car', 'Pedestrian', 'Cyclist'], help='List of classes to evaluate (default: KITTI common classes). Case sensitive.') + parser.add_argument('--file_extension', type=str, default='.txt', help='Extension of the label files (default: .txt).') + + args = parser.parse_args() + + ###### Argument Validation and Setup ###### + if not os.path.isdir(args.gt_dir): + print(f"[LOG] Error: Ground truth directory not found: {args.gt_dir}") + return + # Check prediction directories *before* loading GT + valid_pred_dirs = [] + original_detector_names = args.detector_names if args.detector_names else [os.path.basename(d.rstrip('/\\')) for d in args.pred_dirs] + final_detector_names = [] + pred_dir_map = {} # Map name back to directory path + + for i, d in enumerate(args.pred_dirs): + detector_name = original_detector_names[i] + if not os.path.isdir(d): + print(f"[LOG] Prediction directory not found: {d}. Skipping detector '{detector_name}'.") + else: + valid_pred_dirs.append(d) + final_detector_names.append(detector_name) + pred_dir_map[detector_name] = d + + if not valid_pred_dirs: + print("[LOG] Error: No valid prediction directories provided.") + return + + # Ensure output directory exists + try: + if not os.path.exists(args.output_dir): + os.makedirs(args.output_dir) + print(f"[LOG] Created output directory: {args.output_dir}") + except OSError as e: + print(f"[LOG] Error creating output directory {args.output_dir}: {e}") + return + + print("[LOG] Configuration") + print(f"Ground Truth Dir: {args.gt_dir}") + print(f"Prediction Dirs: {valid_pred_dirs}") # Show only valid dirs being used + print(f"Detector Names: {final_detector_names}") # Show corresponding names + print(f"Output Dir: {args.output_dir}") + print(f"IoU Threshold: {args.iou_threshold}") + print(f"Classes: {args.classes}") + print(f"File Extension: {args.file_extension}") + print("=====================\n") + + ###### Load Data ###### + print("[LOG] Loading data...") + try: + gt_files = sorted([f for f in os.listdir(args.gt_dir) if f.endswith(args.file_extension)]) + if not gt_files: + print(f"[LOG] No ground truth files found in {args.gt_dir} with extension {args.file_extension}. Cannot evaluate.") + return + + print(f"Found {len(gt_files)} ground truth files.") + gt_boxes_all_samples = {} # {sample_id: {class: [boxes]}} + # Load GT data first + print("Loading Ground Truth...") + for filename in gt_files: + sample_id = os.path.splitext(filename)[0] + gt_file_path = os.path.join(args.gt_dir, filename) + gt_boxes_all_samples[sample_id] = load_labels(gt_file_path, is_gt=True) + print("Ground Truth loaded.") + + # Load prediction data for valid detectors + pred_boxes_all_detectors = {} # {detector_name: {sample_id: {class: [boxes]}}} + print("Loading Predictions...") + for detector_name in final_detector_names: + pred_dir = pred_dir_map[detector_name] # Get path from map + print(f" Loading from {detector_name} ({pred_dir})...") + pred_boxes_all_detectors[detector_name] = {} + files_found_count = 0 + for filename in gt_files: # Iterate based on GT files for consistency + sample_id = os.path.splitext(filename)[0] + pred_file_path = os.path.join(pred_dir, filename) + # load_labels handles non-existent files, returns empty dict + pred_boxes_all_detectors[detector_name][sample_id] = load_labels(pred_file_path, is_gt=False) + if os.path.exists(pred_file_path): + files_found_count += 1 + print(f" Found {files_found_count} matching prediction files for {detector_name}.") + print("Predictions loaded.") + + print(f"\n[LOG] Loaded data for {len(gt_files)} samples and {len(final_detector_names)} detector(s).\n") + + except Exception as e: + print(f"[LOG] Error during data loading: {e}") + traceback.print_exc() + return + + ###### Evaluation ###### + print("[LOG] Evaluating detectors...") + results_all_detectors = {} # {detector_name: {class: {'ap':, 'p':, 'r':, 'gt':, 'pred':}}} + try: + for detector_name in final_detector_names: # Use the filtered list of names + print(f" Evaluating: {detector_name}") + # Check if prediction data was actually loaded for this detector + if detector_name not in pred_boxes_all_detectors: + print(f" [LOG] Skipping {detector_name} - No prediction data loaded (check LOG)") + results_all_detectors[detector_name] = {} # Store empty results + continue + + pred_boxes_all_samples = pred_boxes_all_detectors[detector_name] + + if not gt_boxes_all_samples: # Redundant check, but safe + print(f" Skipping {detector_name} - No ground truth data loaded.") + results_all_detectors[detector_name] = {} + continue + + # Perform evaluation + results_by_class = evaluate_detector( + gt_boxes_all_samples, + pred_boxes_all_samples, + args.classes, # Pass the user-specified classes + args.iou_threshold + ) + results_all_detectors[detector_name] = results_by_class + print("[LOG] Evaluation complete.\n") + except Exception as e: + print(f"[LOG] Error during evaluation: {e}") + traceback.print_exc() + return # Stop execution on evaluation error + + + ###### Report Results & Save ###### + print("[LOG] Results") + results_file_path = os.path.join(args.output_dir, 'evaluation_metrics.txt') + + try: + with open(results_file_path, 'w') as f: + f.write(f"Evaluation Results (IoU Threshold: {args.iou_threshold})\n") + f.write(f"Evaluated Classes: {', '.join(args.classes)}\n") + f.write("="*60 + "\n") + + overall_mAPs = {} # Store mAP for each detector {detector_name: mAP_value} + + # Process results for each detector that was evaluated + for detector_name in final_detector_names: + # Check if results exist for this detector (might be skipped if loading failed) + if detector_name not in results_all_detectors: + print(f"\nDetector: {detector_name} (SKIPPED - No results)") + f.write(f"\nDetector: {detector_name} (SKIPPED - No results)\n") + continue # Skip to the next detector + + results_by_class = results_all_detectors[detector_name] + print(f"\nDetector: {detector_name}") + f.write(f"\nDetector: {detector_name}\n") + f.write(f"{'Class':<15} | {'AP':<10} | {'Num GT':<10} | {'Num Pred':<10}\n") + f.write("-" * 55 + "\n") + + detector_aps = [] # AP values for classes with GT > 0 for this detector + evaluated_classes_with_gt = [] # Class names with GT > 0 for this detector + + # Iterate through the classes specified by the user for consistent reporting + for cls in args.classes: + # Check if GT data exists for this class across all samples + num_gt_total_for_class = sum(len(gt_boxes.get(cls, [])) for gt_boxes in gt_boxes_all_samples.values()) + + if cls in results_by_class: + # Results exist for this class and detector + res = results_by_class[cls] + ap = res['ap'] + num_gt = res['num_gt'] # Should match num_gt_total_for_class if evaluated correctly + num_pred = res['num_pred'] # Number of valid predictions for this class + print(f"{cls:<15} | {ap:<10.4f} | {num_gt:<10} | {num_pred:<10}") + f.write(f"{cls:<15} | {ap:<10.4f} | {num_gt:<10} | {num_pred:<10}\n") + # Include in mAP calculation only if there were GT boxes for this class + if num_gt > 0: + detector_aps.append(ap) + evaluated_classes_with_gt.append(cls) + # If num_gt is 0 for a class, its AP is 0 or NaN, not included in mAP. + else: + # No results entry for this class, implies 0 valid predictions processed. + # Report AP as 0.0000 if GT existed, otherwise N/A. + print(f"{cls:<15} | {'0.0000' if num_gt_total_for_class > 0 else 'N/A':<10} | {num_gt_total_for_class:<10} | {'0':<10}") + f.write(f"{cls:<15} | {'0.0000' if num_gt_total_for_class > 0 else 'N/A':<10} | {num_gt_total_for_class:<10} | {'0':<10}\n") + # If GT existed, this class contributes 0 to the mAP average. + if num_gt_total_for_class > 0: + detector_aps.append(0.0) + evaluated_classes_with_gt.append(cls) + + # Calculate and report mAP for this detector based on evaluated classes with GT + if detector_aps: # Check if list is not empty + mAP = np.mean(detector_aps) + overall_mAPs[detector_name] = mAP + # Report which classes contributed to the mAP + mAP_info = f"(Classes w/ GT: {', '.join(evaluated_classes_with_gt)})" if evaluated_classes_with_gt else "" + print("-" * 55) + print(f"{'mAP':<15} | {mAP:<10.4f} {mAP_info}") + f.write("-" * 55 + "\n") + f.write(f"{'mAP':<15} | {mAP:<10.4f} {mAP_info}\n") + else: + overall_mAPs[detector_name] = np.nan # Indicate mAP couldn't be computed + print("-" * 55) + print(f"{'mAP':<15} | {'N/A (No GT in evaluated classes)':<10}") + f.write("-" * 55 + "\n") + f.write(f"{'mAP':<15} | {'N/A (No GT in evaluated classes)':<10}\n") + + + ###### Overall Comparison Table ###### + # Check if there are results to compare + if len(final_detector_names) > 0 and results_all_detectors: + print("\n[LOG] Overall Class AP Comparison") + f.write("\n" + "="*60 + "\n") + f.write("Overall Class AP Comparison\n") + f.write("="*60 + "\n") + # Truncate long detector names for table header + header_names = [name[:12] for name in final_detector_names] # Limit name length + header = f"{'Class':<15}" + "".join([f" | {name:<12}" for name in header_names]) + print(header) + f.write(header + "\n") + print("-" * len(header)) + f.write("-" * len(header) + "\n") + + # Iterate through user-specified classes for the table rows + for cls in args.classes: + aps_str = "" + num_gt_total_for_class = sum(len(gt_boxes.get(cls, [])) for gt_boxes in gt_boxes_all_samples.values()) + + # Get AP for each detector for this class + for detector_name in final_detector_names: + # Retrieve AP using .get chains safely + ap_val = results_all_detectors.get(detector_name, {}).get(cls, {}).get('ap', np.nan) + + # Decide display value based on AP and total GT count + if np.isnan(ap_val): + # Show 0.0000 if GT existed, else N/A + display_val = f"{0.0:<12.4f}" if num_gt_total_for_class > 0 else f"{'N/A':<12}" + else: + # Format valid AP value + display_val = f"{ap_val:<12.4f}" + aps_str += f" | {display_val}" + + line = f"{cls:<15}" + aps_str + print(line) + f.write(line + "\n") + + ###### Overall mAP Comparison ###### + print("-" * len(header)) + f.write("-" * len(header) + "\n") + map_str = "" + for detector_name in final_detector_names: + map_val = overall_mAPs.get(detector_name, np.nan) # Get calculated mAP + map_str += f" | {map_val:<12.4f}" if not np.isnan(map_val) else f" | {'N/A':<12}" + map_line = f"{'mAP':<15}" + map_str + print(map_line) + f.write(map_line + "\n") + else: + print("\n[LOG] Skipping overall comparison table - no results available.") + f.write("\n[LOG] Skipping overall comparison table - no results available.\n") + + + print(f"\nMetrics saved to: {results_file_path}") + + except Exception as e: + print(f"[LOG] Error during results reporting/saving: {e}") + traceback.print_exc() + # Continue to plotting if possible + # return # Or stop here + + ###### Plotting ###### + print("\n[LOG] Generating PR curves...") + try: + # Check if results_all_detectors has data before plotting + if results_all_detectors and any(results_all_detectors.values()): + # Pass only valid detector results to plotting function + # Filter out detectors that were skipped or had errors resulting in no results dict + valid_results = {k: v for k, v in results_all_detectors.items() if isinstance(v, dict) and k in final_detector_names} + if valid_results: + plot_pr_curves(valid_results, args.classes, args.output_dir) + print(f"[LOG] PR curve plots potentially saved in: {args.output_dir}") + else: + print("[LOG] Skipping plotting - no valid evaluation results were generated.") + else: + print("[LOG] Skipping plotting - no evaluation results generated.") + + except Exception as e: + print(f"Error during plotting: {e}") + traceback.print_exc() + + print("\n[LOG] Evaluation finished.") + + +if __name__ == '__main__': + main() \ No newline at end of file diff --git a/GEMstack/onboard/perception/sensorFusion/rosbag_processor.py b/GEMstack/onboard/perception/sensorFusion/rosbag_processor.py new file mode 100644 index 000000000..4742e2b49 --- /dev/null +++ b/GEMstack/onboard/perception/sensorFusion/rosbag_processor.py @@ -0,0 +1,103 @@ +import pypcd +import rospy +import message_filters +from sensor_msgs.msg import Image, PointCloud2 +import cv2 +# import os +from cv_bridge import CvBridge + +import numpy as np +np.float = np.float64 # temp fix for the following import +import ros_numpy + +from pathlib import Path + + +class RosbagProcessor(): + # Pairs up and stores Image and PointCloud2 messages into their respective test (20%) and training (80%) png and bin files. + def __init__(self, DATA_DIR: str, train_counter: int = 0, test_counter: int = 0) -> None: + self.__img_folder = 'image_2' + self.__lidar_folder = 'velodyne' + + self.__TEST_DIR = DATA_DIR / 'testing' + self.__TRAIN_DIR = DATA_DIR / 'training' + TEST_IMG_DIR = self.__TEST_DIR / self.__img_folder + TEST_BIN_DIR = self.__TEST_DIR / self.__lidar_folder + TRAIN_IMG_DIR = self.__TRAIN_DIR / self.__img_folder + TRAIN_BIN_DIR = self.__TRAIN_DIR / self.__lidar_folder + + # Create directories if they don't exist + TEST_IMG_DIR.mkdir(parents=True, exist_ok=True) + TEST_BIN_DIR.mkdir(parents=True, exist_ok=True) + TRAIN_IMG_DIR.mkdir(parents=True, exist_ok=True) + TRAIN_BIN_DIR.mkdir(parents=True, exist_ok=True) + + self.__train_counter = train_counter + self.__test_counter = test_counter + self.__bridge = CvBridge() + + # Subscribers and sychronizers + rospy.init_node('bagListener', anonymous=True) + self.rgb_rosbag = message_filters.Subscriber('/oak/rgb/image_raw', Image) + self.top_lidar_rosbag = message_filters.Subscriber('/ouster/points', PointCloud2) + self.sync = message_filters.ApproximateTimeSynchronizer([self.rgb_rosbag, self.top_lidar_rosbag], queue_size=10, slop=0.1) + self.sync.registerCallback(self.pair_data_callback) + + rospy.spin() + + def pair_data_callback(self, rgb_image_msg: Image, lidar_pc2_msg: PointCloud2): + # Divide up 20% of the data for testing: + if (self.__train_counter + self.__test_counter) % 5 == 0: + PARENT_FOLDER = self.__TEST_DIR + idx = self.__test_counter + else: + PARENT_FOLDER = self.__TRAIN_DIR + idx = self.__train_counter + + # Store point cloud and image data with KITTI formatted filenames: + image_filename = PARENT_FOLDER / self.__img_folder / f"{idx:06d}.png" + pc_filename = PARENT_FOLDER / self.__lidar_folder / f"{idx:06d}.bin" + # image_filename = self.get_path(image_filename) + # pc_filename = self.get_path(pc_filename) + + # Read and write the received image into a KITTI specified png file + cv_image = self.__bridge.imgmsg_to_cv2(rgb_image_msg, desired_encoding='bgr8') + cv2.imwrite(image_filename, cv_image) + + # Read and write the lidar data into a KITTI specified bin file: + # Convert to numpy array and flatten it + pc_array = ros_numpy.point_cloud2.pointcloud2_to_array(lidar_pc2_msg) + pc_array = pc_array.reshape(-1) + + # Create (N, 4) array and fill it + points = np.zeros((pc_array.shape[0], 4), dtype=np.float32) + points[:, 0] = pc_array['x'] + points[:, 1] = pc_array['y'] + points[:, 2] = pc_array['z'] + + # Check for intensity just incase: + if 'intensity' in pc_array.dtype.names: + points[:, 3] = pc_array['intensity'] + else: + rospy.loginfo(f"Intensity was set to zero") + points[:, 3] = 0.0 + + # Save as binary file + with open(pc_filename, 'wb') as f: + f.write(points.tobytes()) + + if (self.__train_counter + self.__test_counter) % 5 == 0: + rospy.loginfo(f"Saved test image and point cloud #{str(self.__test_counter)}") + self.__test_counter += 1 + else: + rospy.loginfo(f"Saved train image and point cloud #{str(self.__train_counter)}") + self.__train_counter += 1 + + +if __name__ == '__main__': + BASE_DIR = Path(__file__).resolve().parent + + # Path to the destination folder (relative from this script) + DATA_DIR = BASE_DIR.parent.parent.parent.parent / 'data' / 'sensorFusionData' + DATA_DIR.mkdir(parents=True, exist_ok=True) + bag_processor = RosbagProcessor(DATA_DIR=DATA_DIR) diff --git a/GEMstack/onboard/perception/sensorFusion/test_eval_3d_bbox_performance.py b/GEMstack/onboard/perception/sensorFusion/test_eval_3d_bbox_performance.py new file mode 100644 index 000000000..6651f50bd --- /dev/null +++ b/GEMstack/onboard/perception/sensorFusion/test_eval_3d_bbox_performance.py @@ -0,0 +1,165 @@ +import os +import sys +import numpy as np +import traceback +import shutil + + +eval_script_filename = "eval_3d_bbox_performance.py" +eval_module_name = "eval_3d_bbox_performance" +try: + if not os.path.exists(eval_script_filename): + raise FileNotFoundError(f"Evaluation script '{eval_script_filename}' not found in the current directory.") + + # Attempt the standard import + imported_module = __import__(eval_module_name) + print(f"Module '{eval_module_name}' imported successfully.") + +except: + print("##################") + print(f"{eval_script_filename} import error") + print("##################") + exit(1) + +#### Generate Dummy Data in KITTI Format ##### + +def create_dummy_kitti_data(base_dir, num_samples=3, classes=['Car', 'Pedestrian'], boxes_per_sample=5, is_pred=False, noise_level=0.1, seed=42): + """Generates dummy data files in KITTI format.""" + if os.path.exists(base_dir): + shutil.rmtree(base_dir) # Clean previous runs + os.makedirs(base_dir) + np.random.seed(seed) # reproducibility + + for i in range(num_samples): + filename = os.path.join(base_dir, f"{i:06d}.txt") + with open(filename, 'w') as f: + num_boxes = np.random.randint(1, boxes_per_sample + 1) + for _ in range(num_boxes): + cls = np.random.choice(classes) + + # Generate somewhat box parameters + h = np.random.uniform(1.4, 1.8) if cls == 'Car' else np.random.uniform(1.5, 1.9) # height + w = np.random.uniform(1.5, 2.0) if cls == 'Car' else np.random.uniform(0.5, 1.0) # width + l = np.random.uniform(3.5, 5.0) if cls == 'Car' else np.random.uniform(0.5, 1.0) # length + + loc_x = np.random.uniform(-15, 15) # center x (lateral) + loc_z = np.random.uniform(5, 50) # center z (depth) + loc_y_bottom = np.random.uniform(1.6, 1.7) # Approximate height of bottom relative to camera origin + rot_y = np.random.uniform(-np.pi/2, np.pi/2) # Yaw + + # Placeholder values + truncated = 0.0 + occluded = 0 # 0=visible + alpha = -10 + bbox_2d = [0.0, 0.0, 50.0, 50.0] + score = np.random.uniform(0.5, 1.0) + + # Add noise for predictions + if is_pred: + h *= np.random.normal(1, noise_level * 0.1) + w *= np.random.normal(1, noise_level * 0.1) + l *= np.random.normal(1, noise_level * 0.1) + loc_x += np.random.normal(0, noise_level * 1.0) + loc_y_bottom += np.random.normal(0, noise_level * 0.1) + loc_z += np.random.normal(0, noise_level * 3.0) + rot_y += np.random.normal(0, noise_level * np.pi/8) + h, w, l = max(0.1, h), max(0.1, w), max(0.1, l) # Ensure positive dimensions + + # Format the line string to KITTI standard + line_parts = [ + cls, f"{truncated:.2f}", f"{occluded:d}", f"{alpha:.2f}", + f"{bbox_2d[0]:.2f}", f"{bbox_2d[1]:.2f}", f"{bbox_2d[2]:.2f}", f"{bbox_2d[3]:.2f}", + f"{h:.2f}", f"{w:.2f}", f"{l:.2f}", + f"{loc_x:.2f}", f"{loc_y_bottom:.2f}", f"{loc_z:.2f}", + f"{rot_y:.2f}" + ] + + if is_pred: + line_parts.append(f"{score:.4f}") + + f.write(" ".join(line_parts) + "\n") + +# --- Setup Directories --- +base_output_dir = "./kitti_eval_demo" +gt_dir = os.path.join(base_output_dir, "dummy_gt_kitti") +pred1_dir = os.path.join(base_output_dir, "dummy_pred1_kitti") +pred2_dir = os.path.join(base_output_dir, "dummy_pred2_kitti") +eval_output_dir = os.path.join(base_output_dir, "eval_output") + +# --- Generate KITTI-formatted Dummy Data --- +print("\n[LOG] Generating dummy data (KITTI format) for demonstration...") +try: + create_dummy_kitti_data(gt_dir, num_samples=5, classes=['Car', 'Pedestrian'], is_pred=False, seed=42) + create_dummy_kitti_data(pred1_dir, num_samples=5, classes=['Car', 'Pedestrian'], is_pred=True, noise_level=0.1, seed=101) + create_dummy_kitti_data(pred2_dir, num_samples=5, classes=['Car', 'Pedestrian'], is_pred=True, noise_level=0.3, seed=202) + missing_pred_file = os.path.join(pred2_dir, "000004.txt") + if os.path.exists(missing_pred_file): + os.remove(missing_pred_file) + print(f"Removed {missing_pred_file} to simulate missing prediction.") + print("Dummy data generated.") +except Exception as e: + print(f"Error generating dummy data: {e}") + traceback.print_exc() + exit(1) + +# --- Run the Evaluation Script --- +print(f"\n[LOG] #### Run Evaluation ") +cmd_args = [ + gt_dir, + pred1_dir, + pred2_dir, + eval_output_dir, + '--detector_names', 'DetectorA_Imported', 'DetectorB_Imported', + '--iou_threshold', '0.7', + '--classes', 'Car', 'Pedestrian' +] + + +original_argv = sys.argv +sys.argv = [eval_script_filename] + cmd_args +exit_code = 0 + +try: + print(f"[LOG] call {eval_module_name}.main()...") + imported_module.main() + +except AttributeError: + exit_code = 1 +except Exception as e: + traceback.print_exc() + exit_code = 1 +finally: + # Restore original sys.argv + sys.argv = original_argv + +##### List generated output files ##### +print("\n--- Generated Output Files ---") +if os.path.exists(eval_output_dir): + try: + output_files = [os.path.join(eval_output_dir, item) for item in os.listdir(eval_output_dir)] + if output_files: + for item_path in sorted(output_files): # Sort for consistent listing + print(item_path) + else: + print(f"Output directory '{eval_output_dir}' is empty.") + except Exception as e: + print(f"Error listing output directory {eval_output_dir}: {e}") +else: + print(f"Output directory '{eval_output_dir}' not created or accessible.") + +# Display contents of the metrics file if execution was successful +metrics_file = os.path.join(eval_output_dir, 'evaluation_metrics.txt') +if exit_code == 0 and os.path.exists(metrics_file): + print(f"\n--- Content of {metrics_file} ---") + try: + with open(metrics_file, 'r') as f: + print(f.read()) + except Exception as e: + print(f"Error reading metrics file {metrics_file}: {e}") +elif exit_code != 0: + # Use syntax compatible with Python < 3.8 + print(f"\n--- Metrics file not displayed due to execution error (exit_code={exit_code}) ---") +elif not os.path.exists(metrics_file): + print(f"\n--- Metrics file not found: {metrics_file} ---") + +print(f"\nDemo finished. Check the '{base_output_dir}' directory for generated data and results.") \ No newline at end of file diff --git a/GEMstack/state/agent.py b/GEMstack/state/agent.py index 757805837..2ca23ba76 100644 --- a/GEMstack/state/agent.py +++ b/GEMstack/state/agent.py @@ -11,6 +11,7 @@ class AgentEnum(Enum): LARGE_TRUCK = 2 PEDESTRIAN = 3 BICYCLIST = 4 + CONE = 5 class AgentActivityEnum(Enum): @@ -18,6 +19,9 @@ class AgentActivityEnum(Enum): MOVING = 1 # standard motion. Predictions will be used here FAST = 2 # indicates faster than usual motion, e.g., runners. UNDETERMINED = 3 # unknown activity + STANDING = 4 # standing cone + LEFT = 5 # flipped cone facing left + RIGHT = 6 # flipped cone facing right @dataclass @@ -41,4 +45,4 @@ def velocity_parent(self) -> Tuple[float,float,float]: def to_frame(self, frame : ObjectFrameEnum, current_pose = None, start_pose_abs = None) -> AgentState: newpose = self.pose.to_frame(frame,current_pose,start_pose_abs) newvelocity = convert_vector(self.velocity,self.pose.frame,frame,current_pose,start_pose_abs) - return replace(self,pose=newpose,velocity=newvelocity) \ No newline at end of file + return replace(self,pose=newpose,velocity=newvelocity) diff --git a/launch/cone_detection.yaml b/launch/cone_detection.yaml new file mode 100644 index 000000000..165a5b9b8 --- /dev/null +++ b/launch/cone_detection.yaml @@ -0,0 +1,119 @@ +description: "Run the yielding trajectory planner on the real vehicle with real perception" +mode: hardware +vehicle_interface: gem_hardware.GEMHardwareInterface +mission_execution: StandardExecutor + +# Recovery behavior after a component failure +recovery: + planning: + trajectory_tracking : recovery.StopTrajectoryTracker + +# Driving behavior for the GEM vehicle. Runs real pedestrian perception, yield planner, but does not send commands to real vehicle. +drive: + perception: + state_estimation : GNSSStateEstimator + agent_detection : cone_detection.ConeDetector3D + perception_normalization : StandardPerceptionNormalizer + planning: + relations_estimation: + type: pedestrian_yield_logic.PedestrianYielder + args: + mode: 'real' + params: { + 'yielder': 'expert', # 'expert', 'analytic', or 'simulation' + 'planner': 'milestone', # 'milestone', 'dt', or 'dx' + 'desired_speed': 1.0, # m/s, 1.5 m/s seems max speed? Feb24 + 'acceleration': 0.75 # m/s2, 0.5 is not enough to start moving. Feb24 + } + route_planning: + type: StaticRoutePlanner + args: [!relative_path '../GEMstack/knowledge/routes/forward_15m.csv','start'] + motion_planning: longitudinal_planning.YieldTrajectoryPlanner + trajectory_tracking: + type: pure_pursuit.PurePursuitTrajectoryTracker + print: False + + +log: + # Specify the top-level folder to save the log files. Default is 'logs' + #folder : 'logs' + # If prefix is specified, then the log folder will be named with the prefix followed by the date and time. Default no prefix + #prefix : 'fixed_route_' + # If suffix is specified, then logs will output to folder/prefix+suffix. Default uses date and time as the suffix + #suffix : 'test3' + # Specify which ros topics to record to vehicle.bag. Default records nothing. This records the "standard" ROS topics. + ros_topics : + # Specify options to pass to rosbag record. Default is no options. + #rosbag_options : '--split --size=1024' + # If True, then record all readings / commands of the vehicle interface. Default False + vehicle_interface : True + # Specify which components to record to behavior.json. Default records nothing + components : ['state_estimation','agent_detection','motion_planning'] + # Specify which components of state to record to state.json. Default records nothing + #state: ['all'] + # Specify the rate in Hz at which to record state to state.json. Default records at the pipeline's rate + #state_rate: 10 + # If True, then make plots of the recorded data specifically from PurePursuitTrajectoryTracker_debug.csv and report metrics. Default False + #auto_plot : True +replay: # Add items here to set certain topics / inputs to be replayed from logs + # Specify which log folder to replay from + log: + ros_topics : [] + components : [] + +#usually can keep this constant +computation_graph: !include "../GEMstack/knowledge/defaults/computation_graph.yaml" + +variants: + detector_only: + run: + description: "Run the pedestrian detection code" + drive: + planning: + trajectory_tracking: + real_sim: + run: + description: "Run the pedestrian detection code with real detection and fake simulation" + mode: hardware + vehicle_interface: + type: gem_mixed.GEMRealSensorsWithSimMotionInterface + args: + scene: !relative_path '../scenes/xyhead_demo.yaml' + mission_execution: StandardExecutor + require_engaged: False + visualization: !include "klampt_visualization.yaml" + drive: + perception: + state_estimation : OmniscientStateEstimator + planning: + relations_estimation: + type: pedestrian_yield_logic.PedestrianYielder + args: + mode: 'sim' + + fake_real: + run: + description: "Run the yielding trajectory planner on the real vehicle with faked perception" + drive: + perception: + agent_detection : cone_detection.FakeConeDetector2D + + fake_sim: + run: + description: "Run the yielding trajectory planner in simulation with faked perception" + mode: simulation + vehicle_interface: + type: gem_simulator.GEMDoubleIntegratorSimulationInterface + args: + scene: !relative_path '../scenes/xyhead_demo.yaml' + visualization: !include "klampt_visualization.yaml" + drive: + perception: + # agent_detection : pedestrian_detection.FakePedestrianDetector2D + agent_detection : OmniscientAgentDetector #this option reads agents from the simulator + state_estimation : OmniscientStateEstimator + planning: + relations_estimation: + type: pedestrian_yield_logic.PedestrianYielder + args: + mode: 'sim'