diff --git a/GEMstack/onboard/perception/cone_detection.py b/GEMstack/onboard/perception/cone_detection.py index ba6a12872..5ab875d93 100644 --- a/GEMstack/onboard/perception/cone_detection.py +++ b/GEMstack/onboard/perception/cone_detection.py @@ -1,268 +1,22 @@ from ...state import AllState, VehicleState, ObjectPose, ObjectFrameEnum, AgentState, AgentEnum, AgentActivityEnum from ..interface.gem import GEMInterface from ..component import Component +from .perception_utils import * from ultralytics import YOLO import cv2 from typing import Dict import open3d as o3d import numpy as np -from sklearn.cluster import DBSCAN from scipy.spatial.transform import Rotation as R import rospy from sensor_msgs.msg import PointCloud2, Image -import sensor_msgs.point_cloud2 as pc2 -import struct, ctypes from message_filters import Subscriber, ApproximateTimeSynchronizer from cv_bridge import CvBridge import time -import math -import ros_numpy import os import yaml -# ----- Helper Functions ----- - -def cylindrical_roi(points, center, radius, height): - horizontal_dist = np.linalg.norm(points[:, :2] - center[:2], axis=1) - vertical_diff = np.abs(points[:, 2] - center[2]) - mask = (horizontal_dist <= radius) & (vertical_diff <= height / 2) - return points[mask] - - -def filter_points_within_threshold(points, threshold=15.0): - distances = np.linalg.norm(points, axis=1) - mask = distances <= threshold - return points[mask] - -def match_existing_cone( - new_center: np.ndarray, - new_dims: tuple, - existing_agents: Dict[str, AgentState], - distance_threshold: float = 1.0 -) -> str: - """ - Find the closest existing Cone agent within a specified distance threshold. - """ - best_agent_id = None - best_dist = float('inf') - for agent_id, agent_state in existing_agents.items(): - old_center = np.array([agent_state.pose.x, agent_state.pose.y, agent_state.pose.z]) - dist = np.linalg.norm(new_center - old_center) - if dist < distance_threshold and dist < best_dist: - best_dist = dist - best_agent_id = agent_id - return best_agent_id - - -def compute_velocity(old_pose: ObjectPose, new_pose: ObjectPose, dt: float) -> tuple: - """ - Compute the (vx, vy, vz) velocity based on change in pose over time. - """ - if dt <= 0: - return (0, 0, 0) - vx = (new_pose.x - old_pose.x) / dt - vy = (new_pose.y - old_pose.y) / dt - vz = (new_pose.z - old_pose.z) / dt - return (vx, vy, vz) - - -def extract_roi_box(lidar_pc, center, half_extents): - """ - Extract a region of interest (ROI) from the LiDAR point cloud defined by an axis-aligned bounding box. - """ - lower = center - half_extents - upper = center + half_extents - mask = np.all((lidar_pc >= lower) & (lidar_pc <= upper), axis=1) - return lidar_pc[mask] - - -def pc2_to_numpy(pc2_msg, want_rgb=False): - """ - Convert a ROS PointCloud2 message into a numpy array quickly using ros_numpy. - This function extracts the x, y, z coordinates from the point cloud. - """ - # Convert the ROS message to a numpy structured array - pc = ros_numpy.point_cloud2.pointcloud2_to_array(pc2_msg) - # Stack x,y,z fields to a (N,3) array - pts = np.stack((np.array(pc['x']).ravel(), - np.array(pc['y']).ravel(), - np.array(pc['z']).ravel()), axis=1) - # Apply filtering (for example, x > 0 and z in a specified range) - mask = (pts[:, 0] > -0.5) & (pts[:, 2] < -1) & (pts[:, 2] > -2.7) - return pts[mask] - - -def backproject_pixel(u, v, K): - """ - Backprojects a pixel coordinate (u, v) into a normalized 3D ray in the camera coordinate system. - """ - cx, cy = K[0, 2], K[1, 2] - fx, fy = K[0, 0], K[1, 1] - x = (u - cx) / fx - y = (v - cy) / fy - ray_dir = np.array([x, y, 1.0]) - return ray_dir / np.linalg.norm(ray_dir) - - -def find_human_center_on_ray(lidar_pc, ray_origin, ray_direction, - t_min, t_max, t_step, - distance_threshold, min_points, ransac_threshold): - """ - Identify the center of a human along a projected ray. - (This function is no longer used in the new approach.) - """ - return None, None, None - - -def extract_roi(pc, center, roi_radius): - """ - Extract points from a point cloud that lie within a specified radius of a center point. - """ - distances = np.linalg.norm(pc - center, axis=1) - return pc[distances < roi_radius] - - -def refine_cluster(roi_points, center, eps=0.2, min_samples=10): - """ - Refine a point cluster by applying DBSCAN and return the cluster closest to 'center'. - """ - if roi_points.shape[0] < min_samples: - return roi_points - clustering = DBSCAN(eps=eps, min_samples=min_samples).fit(roi_points) - labels = clustering.labels_ - valid_clusters = [roi_points[labels == l] for l in set(labels) if l != -1] - if not valid_clusters: - return roi_points - best_cluster = min(valid_clusters, key=lambda c: np.linalg.norm(np.mean(c, axis=0) - center)) - return best_cluster - - -def remove_ground_by_min_range(cluster, z_range=0.05): - """ - Remove points within z_range of the minimum z (assumed to be ground). - """ - if cluster is None or cluster.shape[0] == 0: - return cluster - min_z = np.min(cluster[:, 2]) - filtered = cluster[cluster[:, 2] > (min_z + z_range)] - return filtered - - -def get_bounding_box_center_and_dimensions(points): - """ - Calculate the axis-aligned bounding box's center and dimensions for a set of 3D points. - """ - if points.shape[0] == 0: - return None, None - min_vals = np.min(points, axis=0) - max_vals = np.max(points, axis=0) - center = (min_vals + max_vals) / 2 - dimensions = max_vals - min_vals - return center, dimensions - - -def create_ray_line_set(start, end): - """ - Create an Open3D LineSet object representing a ray between two 3D points. - The line is colored yellow. - """ - points = [start, end] - lines = [[0, 1]] - line_set = o3d.geometry.LineSet() - line_set.points = o3d.utility.Vector3dVector(points) - line_set.lines = o3d.utility.Vector2iVector(lines) - line_set.colors = o3d.utility.Vector3dVector([[1, 1, 0]]) - return line_set - - -def downsample_points(lidar_points, voxel_size=0.15): - pcd = o3d.geometry.PointCloud() - pcd.points = o3d.utility.Vector3dVector(lidar_points) - down_pcd = pcd.voxel_down_sample(voxel_size=voxel_size) - return np.asarray(down_pcd.points) - - -def filter_depth_points(lidar_points, max_depth_diff=0.9, use_norm=True): - if lidar_points.shape[0] == 0: - return lidar_points - - if use_norm: - depths = np.linalg.norm(lidar_points, axis=1) - else: - depths = lidar_points[:, 0] - - min_depth = np.min(depths) - max_possible_depth = min_depth + max_depth_diff - mask = depths < max_possible_depth - return lidar_points[mask] - - -def visualize_geometries(geometries, window_name="Open3D", width=800, height=600, point_size=5.0): - """ - Visualize a list of Open3D geometry objects in a dedicated window. - """ - vis = o3d.visualization.Visualizer() - vis.create_window(window_name=window_name, width=width, height=height) - for geom in geometries: - vis.add_geometry(geom) - opt = vis.get_render_option() - opt.point_size = point_size - vis.run() - vis.destroy_window() - - -def pose_to_matrix(pose): - """ - Compose a 4x4 transformation matrix from a pose state. - Assumes pose has attributes: x, y, z, yaw, pitch, roll, - where the angles are given in degrees. - """ - x = pose.x if pose.x is not None else 0.0 - y = pose.y if pose.y is not None else 0.0 - z = pose.z if pose.z is not None else 0.0 - if pose.yaw is not None and pose.pitch is not None and pose.roll is not None: - yaw = pose.yaw - pitch = pose.pitch - roll = pose.roll - else: - yaw = 0.0 - pitch = 0.0 - roll = 0.0 - R_mat = R.from_euler('zyx', [yaw, pitch, roll]).as_matrix() - T = np.eye(4) - T[:3, :3] = R_mat - T[:3, 3] = np.array([x, y, z]) - return T - - -def transform_points_l2c(lidar_points, T_l2c): - N = lidar_points.shape[0] - pts_hom = np.hstack((lidar_points, np.ones((N, 1)))) # (N,4) - pts_cam = (T_l2c @ pts_hom.T).T # (N,4) - return pts_cam[:, :3] - - -# ----- New: Vectorized projection function ----- -def project_points(pts_cam, K, original_lidar_points): - """ - Vectorized version. - pts_cam: (N,3) array of points in camera coordinates. - original_lidar_points: (N,3) array of points in LiDAR coordinates. - Returns a (M,5) array: [u, v, X_lidar, Y_lidar, Z_lidar] for all points with Z>0. - """ - mask = pts_cam[:, 2] > 0 - pts_cam_valid = pts_cam[mask] - lidar_valid = original_lidar_points[mask] - Xc = pts_cam_valid[:, 0] - Yc = pts_cam_valid[:, 1] - Zc = pts_cam_valid[:, 2] - u = (K[0, 0] * (Xc / Zc) + K[0, 2]).astype(np.int32) - v = (K[1, 1] * (Yc / Zc) + K[1, 2]).astype(np.int32) - proj = np.column_stack((u, v, lidar_valid)) - return proj - - class ConeDetector3D(Component): """ Detects cones by fusing YOLO 2D detections with LiDAR point cloud data. @@ -403,61 +157,29 @@ def undistort_image(self, image, K, D): return undistorted, newK def update(self, vehicle: VehicleState) -> Dict[str, AgentState]: - - start = time.time() downsample = False + # Gate guards against data not being present for both sensors: if self.latest_image is None or self.latest_lidar is None: return {} lastest_image = self.latest_image.copy() - # Ensure data/ exists and build timestamp + + # Set up current time variables + start = time.time() + current_time = self.vehicle_interface.time() + if downsample: lidar_down = downsample_points(self.latest_lidar, voxel_size=0.1) else: lidar_down = self.latest_lidar.copy() - current_time = self.vehicle_interface.time() + if self.start_time is None: self.start_time = current_time time_elapsed = current_time - self.start_time + # Ensure data/ exists and build timestamp if self.save_data: - os.makedirs("data", exist_ok=True) - tstamp = int(self.vehicle_interface.time() * 1000) - # 1) Dump raw image - cv2.imwrite(f"data/{tstamp}_image.png", lastest_image) - # 2) Dump raw LiDAR - np.savez(f"data/{tstamp}_lidar.npz", lidar=self.latest_lidar) - # 3) Write BEFORE_TRANSFORM - with open(f"data/{tstamp}_vehstate.txt", "w") as f: - vp = vehicle.pose - f.write( - f"BEFORE_TRANSFORM " - f"x={vp.x:.3f}, y={vp.y:.3f}, z={vp.z:.3f}, " - f"yaw={vp.yaw:.2f}, pitch={vp.pitch:.2f}, roll={vp.roll:.2f}\n" - ) - # Compute vehicle_start_pose in either START or CURRENT - if self.use_start_frame: - if self.start_pose_abs is None: - self.start_pose_abs = vehicle.pose - vehicle_start_pose = vehicle.pose.to_frame( - ObjectFrameEnum.START, - vehicle.pose, - self.start_pose_abs - ) - mode = "START" - else: - vehicle_start_pose = vehicle.pose - mode = "CURRENT" - with open(f"data/{tstamp}_vehstate.txt", "a") as f: - f.write( - f"AFTER_TRANSFORM " - f"x={vehicle_start_pose.x:.3f}, " - f"y={vehicle_start_pose.y:.3f}, " - f"z={vehicle_start_pose.z:.3f}, " - f"yaw={vehicle_start_pose.yaw:.2f}, " - f"pitch={vehicle_start_pose.pitch:.2f}, " - f"roll={vehicle_start_pose.roll:.2f}, " - f"frame={mode}\n" - ) + self.save_sensor_data(vehicle=vehicle, latest_image=latest_image) + if self.camera_front == False: start = time.time() undistorted_img, current_K = self.undistort_image(lastest_image, self.K, self.D) @@ -500,6 +222,8 @@ def update(self, vehicle: VehicleState) -> Dict[str, AgentState]: cx, cy, w, h = box combined_boxes.append((cx, cy, w, h, AgentActivityEnum.STANDING)) + # Visualize the received images in 2D with their corresponding labels + # It draws rectangles and labels on the images: if getattr(self, 'visualize_2d', False): for (cx, cy, w, h, activity) in combined_boxes: left = int(cx - w / 2) @@ -524,8 +248,15 @@ def update(self, vehicle: VehicleState) -> Dict[str, AgentState]: cv2.imshow("Detection - Cone 2D", undistorted_img) start = time.time() + # Transform the lidar points from lidar frame of reference to camera EXTRINSIC frame of reference. + # Then project the pixels onto the lidar points to "paint them" (essentially determine which points are associated with detected objects) pts_cam = transform_points_l2c(lidar_down, self.T_l2c) projected_pts = project_points(pts_cam, self.current_K, lidar_down) + # What is returned: + # projected_pts[:, 0]: u-coordinate in the image (horizontal pixel position) + # projected_pts[:, 1]: v-coordinate in the image (vertical pixel position) + # projected_pts[:, 2:5]: original X, Y, Z coordinates in the LiDAR frame + end = time.time() # print('-------processing time1---', end -start) @@ -702,6 +433,46 @@ def update(self, vehicle: VehicleState) -> Dict[str, AgentState]: # print('-------processing time', end -start) return self.tracked_agents + def save_sensor_data(self, vehicle: VehicleState, latest_image) -> None: + os.makedirs("data", exist_ok=True) + tstamp = int(self.vehicle_interface.time() * 1000) + # 1) Dump raw image + cv2.imwrite(f"data/{tstamp}_image.png", lastest_image) + # 2) Dump raw LiDAR + np.savez(f"data/{tstamp}_lidar.npz", lidar=self.latest_lidar) + # 3) Write BEFORE_TRANSFORM + with open(f"data/{tstamp}_vehstate.txt", "w") as f: + vp = vehicle.pose + f.write( + f"BEFORE_TRANSFORM " + f"x={vp.x:.3f}, y={vp.y:.3f}, z={vp.z:.3f}, " + f"yaw={vp.yaw:.2f}, pitch={vp.pitch:.2f}, roll={vp.roll:.2f}\n" + ) + # Compute vehicle_start_pose in either START or CURRENT + if self.use_start_frame: + if self.start_pose_abs is None: + self.start_pose_abs = vehicle.pose + vehicle_start_pose = vehicle.pose.to_frame( + ObjectFrameEnum.START, + vehicle.pose, + self.start_pose_abs + ) + mode = "START" + else: + vehicle_start_pose = vehicle.pose + mode = "CURRENT" + with open(f"data/{tstamp}_vehstate.txt", "a") as f: + f.write( + f"AFTER_TRANSFORM " + f"x={vehicle_start_pose.x:.3f}, " + f"y={vehicle_start_pose.y:.3f}, " + f"z={vehicle_start_pose.z:.3f}, " + f"yaw={vehicle_start_pose.yaw:.2f}, " + f"pitch={vehicle_start_pose.pitch:.2f}, " + f"roll={vehicle_start_pose.roll:.2f}, " + f"frame={mode}\n" + ) + # ----- Fake Cone Detector 2D (for Testing Purposes) ----- diff --git a/GEMstack/onboard/perception/perception_utils.py b/GEMstack/onboard/perception/perception_utils.py new file mode 100644 index 000000000..a80dc1eae --- /dev/null +++ b/GEMstack/onboard/perception/perception_utils.py @@ -0,0 +1,251 @@ +from ...state import ObjectPose, AgentState +from typing import Dict +import open3d as o3d +import numpy as np +from sklearn.cluster import DBSCAN +from scipy.spatial.transform import Rotation as R +from sensor_msgs.msg import PointCloud2 +import sensor_msgs.point_cloud2 as pc2 +import ros_numpy + + +# ----- Helper Functions ----- + +def cylindrical_roi(points, center, radius, height): + horizontal_dist = np.linalg.norm(points[:, :2] - center[:2], axis=1) + vertical_diff = np.abs(points[:, 2] - center[2]) + mask = (horizontal_dist <= radius) & (vertical_diff <= height / 2) + return points[mask] + + +def filter_points_within_threshold(points, threshold=15.0): + distances = np.linalg.norm(points, axis=1) + mask = distances <= threshold + return points[mask] + +def match_existing_cone( + new_center: np.ndarray, + new_dims: tuple, + existing_agents: Dict[str, AgentState], + distance_threshold: float = 1.0 +) -> str: + """ + Find the closest existing Cone agent within a specified distance threshold. + """ + best_agent_id = None + best_dist = float('inf') + for agent_id, agent_state in existing_agents.items(): + old_center = np.array([agent_state.pose.x, agent_state.pose.y, agent_state.pose.z]) + dist = np.linalg.norm(new_center - old_center) + if dist < distance_threshold and dist < best_dist: + best_dist = dist + best_agent_id = agent_id + return best_agent_id + + +def compute_velocity(old_pose: ObjectPose, new_pose: ObjectPose, dt: float) -> tuple: + """ + Compute the (vx, vy, vz) velocity based on change in pose over time. + """ + if dt <= 0: + return (0, 0, 0) + vx = (new_pose.x - old_pose.x) / dt + vy = (new_pose.y - old_pose.y) / dt + vz = (new_pose.z - old_pose.z) / dt + return (vx, vy, vz) + + +def extract_roi_box(lidar_pc, center, half_extents): + """ + Extract a region of interest (ROI) from the LiDAR point cloud defined by an axis-aligned bounding box. + """ + lower = center - half_extents + upper = center + half_extents + mask = np.all((lidar_pc >= lower) & (lidar_pc <= upper), axis=1) + return lidar_pc[mask] + + +def pc2_to_numpy(pc2_msg, want_rgb=False): + """ + Convert a ROS PointCloud2 message into a numpy array quickly using ros_numpy. + This function extracts the x, y, z coordinates from the point cloud. + """ + # Convert the ROS message to a numpy structured array + pc = ros_numpy.point_cloud2.pointcloud2_to_array(pc2_msg) + # Stack x,y,z fields to a (N,3) array + pts = np.stack((np.array(pc['x']).ravel(), + np.array(pc['y']).ravel(), + np.array(pc['z']).ravel()), axis=1) + # Apply filtering (for example, x > 0 and z in a specified range) + mask = (pts[:, 0] > -0.5) & (pts[:, 2] < -1) & (pts[:, 2] > -2.7) + return pts[mask] + + +def backproject_pixel(u, v, K): + """ + Backprojects a pixel coordinate (u, v) into a normalized 3D ray in the camera coordinate system. + """ + cx, cy = K[0, 2], K[1, 2] + fx, fy = K[0, 0], K[1, 1] + x = (u - cx) / fx + y = (v - cy) / fy + ray_dir = np.array([x, y, 1.0]) + return ray_dir / np.linalg.norm(ray_dir) + + +def find_human_center_on_ray(lidar_pc, ray_origin, ray_direction, + t_min, t_max, t_step, + distance_threshold, min_points, ransac_threshold): + """ + Identify the center of a human along a projected ray. + (This function is no longer used in the new approach.) + """ + return None, None, None + + +def extract_roi(pc, center, roi_radius): + """ + Extract points from a point cloud that lie within a specified radius of a center point. + """ + distances = np.linalg.norm(pc - center, axis=1) + return pc[distances < roi_radius] + + +def refine_cluster(roi_points, center, eps=0.2, min_samples=10): + """ + Refine a point cluster by applying DBSCAN and return the cluster closest to 'center'. + """ + if roi_points.shape[0] < min_samples: + return roi_points + clustering = DBSCAN(eps=eps, min_samples=min_samples).fit(roi_points) + labels = clustering.labels_ + valid_clusters = [roi_points[labels == l] for l in set(labels) if l != -1] + if not valid_clusters: + return roi_points + best_cluster = min(valid_clusters, key=lambda c: np.linalg.norm(np.mean(c, axis=0) - center)) + return best_cluster + + +def remove_ground_by_min_range(cluster, z_range=0.05): + """ + Remove points within z_range of the minimum z (assumed to be ground). + """ + if cluster is None or cluster.shape[0] == 0: + return cluster + min_z = np.min(cluster[:, 2]) + filtered = cluster[cluster[:, 2] > (min_z + z_range)] + return filtered + + +def get_bounding_box_center_and_dimensions(points): + """ + Calculate the axis-aligned bounding box's center and dimensions for a set of 3D points. + """ + if points.shape[0] == 0: + return None, None + min_vals = np.min(points, axis=0) + max_vals = np.max(points, axis=0) + center = (min_vals + max_vals) / 2 + dimensions = max_vals - min_vals + return center, dimensions + + +def create_ray_line_set(start, end): + """ + Create an Open3D LineSet object representing a ray between two 3D points. + The line is colored yellow. + """ + points = [start, end] + lines = [[0, 1]] + line_set = o3d.geometry.LineSet() + line_set.points = o3d.utility.Vector3dVector(points) + line_set.lines = o3d.utility.Vector2iVector(lines) + line_set.colors = o3d.utility.Vector3dVector([[1, 1, 0]]) + return line_set + + +def downsample_points(lidar_points, voxel_size=0.15): + pcd = o3d.geometry.PointCloud() + pcd.points = o3d.utility.Vector3dVector(lidar_points) + down_pcd = pcd.voxel_down_sample(voxel_size=voxel_size) + return np.asarray(down_pcd.points) + + +def filter_depth_points(lidar_points, max_depth_diff=0.9, use_norm=True): + if lidar_points.shape[0] == 0: + return lidar_points + + if use_norm: + depths = np.linalg.norm(lidar_points, axis=1) + else: + depths = lidar_points[:, 0] + + min_depth = np.min(depths) + max_possible_depth = min_depth + max_depth_diff + mask = depths < max_possible_depth + return lidar_points[mask] + + +def visualize_geometries(geometries, window_name="Open3D", width=800, height=600, point_size=5.0): + """ + Visualize a list of Open3D geometry objects in a dedicated window. + """ + vis = o3d.visualization.Visualizer() + vis.create_window(window_name=window_name, width=width, height=height) + for geom in geometries: + vis.add_geometry(geom) + opt = vis.get_render_option() + opt.point_size = point_size + vis.run() + vis.destroy_window() + + +def pose_to_matrix(pose): + """ + Compose a 4x4 transformation matrix from a pose state. + Assumes pose has attributes: x, y, z, yaw, pitch, roll, + where the angles are given in degrees. + """ + x = pose.x if pose.x is not None else 0.0 + y = pose.y if pose.y is not None else 0.0 + z = pose.z if pose.z is not None else 0.0 + if pose.yaw is not None and pose.pitch is not None and pose.roll is not None: + yaw = pose.yaw + pitch = pose.pitch + roll = pose.roll + else: + yaw = 0.0 + pitch = 0.0 + roll = 0.0 + R_mat = R.from_euler('zyx', [yaw, pitch, roll]).as_matrix() + T = np.eye(4) + T[:3, :3] = R_mat + T[:3, 3] = np.array([x, y, z]) + return T + + +def transform_points_l2c(lidar_points, T_l2c): + N = lidar_points.shape[0] + pts_hom = np.hstack((lidar_points, np.ones((N, 1)))) # (N,4) + pts_cam = (T_l2c @ pts_hom.T).T # (N,4) + return pts_cam[:, :3] + + +# ----- New: Vectorized projection function ----- +def project_points(pts_cam, K, original_lidar_points): + """ + Vectorized version. + pts_cam: (N,3) array of points in camera coordinates. + original_lidar_points: (N,3) array of points in LiDAR coordinates. + Returns a (M,5) array: [u, v, X_lidar, Y_lidar, Z_lidar] for all points with Z>0. + """ + mask = pts_cam[:, 2] > 0 + pts_cam_valid = pts_cam[mask] + lidar_valid = original_lidar_points[mask] + Xc = pts_cam_valid[:, 0] + Yc = pts_cam_valid[:, 1] + Zc = pts_cam_valid[:, 2] + u = (K[0, 0] * (Xc / Zc) + K[0, 2]).astype(np.int32) + v = (K[1, 1] * (Yc / Zc) + K[1, 2]).astype(np.int32) + proj = np.column_stack((u, v, lidar_valid)) + return proj \ No newline at end of file