|
| 1 | +from ...state import ObjectPose, AgentState |
| 2 | +from typing import Dict |
| 3 | +import open3d as o3d |
| 4 | +import numpy as np |
| 5 | +from sklearn.cluster import DBSCAN |
| 6 | +from scipy.spatial.transform import Rotation as R |
| 7 | +from sensor_msgs.msg import PointCloud2 |
| 8 | +import sensor_msgs.point_cloud2 as pc2 |
| 9 | +import ros_numpy |
| 10 | + |
| 11 | + |
| 12 | +# ----- Helper Functions ----- |
| 13 | + |
| 14 | +def cylindrical_roi(points, center, radius, height): |
| 15 | + horizontal_dist = np.linalg.norm(points[:, :2] - center[:2], axis=1) |
| 16 | + vertical_diff = np.abs(points[:, 2] - center[2]) |
| 17 | + mask = (horizontal_dist <= radius) & (vertical_diff <= height / 2) |
| 18 | + return points[mask] |
| 19 | + |
| 20 | + |
| 21 | +def filter_points_within_threshold(points, threshold=15.0): |
| 22 | + distances = np.linalg.norm(points, axis=1) |
| 23 | + mask = distances <= threshold |
| 24 | + return points[mask] |
| 25 | + |
| 26 | +def match_existing_cone( |
| 27 | + new_center: np.ndarray, |
| 28 | + new_dims: tuple, |
| 29 | + existing_agents: Dict[str, AgentState], |
| 30 | + distance_threshold: float = 1.0 |
| 31 | +) -> str: |
| 32 | + """ |
| 33 | + Find the closest existing Cone agent within a specified distance threshold. |
| 34 | + """ |
| 35 | + best_agent_id = None |
| 36 | + best_dist = float('inf') |
| 37 | + for agent_id, agent_state in existing_agents.items(): |
| 38 | + old_center = np.array([agent_state.pose.x, agent_state.pose.y, agent_state.pose.z]) |
| 39 | + dist = np.linalg.norm(new_center - old_center) |
| 40 | + if dist < distance_threshold and dist < best_dist: |
| 41 | + best_dist = dist |
| 42 | + best_agent_id = agent_id |
| 43 | + return best_agent_id |
| 44 | + |
| 45 | + |
| 46 | +def compute_velocity(old_pose: ObjectPose, new_pose: ObjectPose, dt: float) -> tuple: |
| 47 | + """ |
| 48 | + Compute the (vx, vy, vz) velocity based on change in pose over time. |
| 49 | + """ |
| 50 | + if dt <= 0: |
| 51 | + return (0, 0, 0) |
| 52 | + vx = (new_pose.x - old_pose.x) / dt |
| 53 | + vy = (new_pose.y - old_pose.y) / dt |
| 54 | + vz = (new_pose.z - old_pose.z) / dt |
| 55 | + return (vx, vy, vz) |
| 56 | + |
| 57 | + |
| 58 | +def extract_roi_box(lidar_pc, center, half_extents): |
| 59 | + """ |
| 60 | + Extract a region of interest (ROI) from the LiDAR point cloud defined by an axis-aligned bounding box. |
| 61 | + """ |
| 62 | + lower = center - half_extents |
| 63 | + upper = center + half_extents |
| 64 | + mask = np.all((lidar_pc >= lower) & (lidar_pc <= upper), axis=1) |
| 65 | + return lidar_pc[mask] |
| 66 | + |
| 67 | + |
| 68 | +def pc2_to_numpy(pc2_msg, want_rgb=False): |
| 69 | + """ |
| 70 | + Convert a ROS PointCloud2 message into a numpy array quickly using ros_numpy. |
| 71 | + This function extracts the x, y, z coordinates from the point cloud. |
| 72 | + """ |
| 73 | + # Convert the ROS message to a numpy structured array |
| 74 | + pc = ros_numpy.point_cloud2.pointcloud2_to_array(pc2_msg) |
| 75 | + # Stack x,y,z fields to a (N,3) array |
| 76 | + pts = np.stack((np.array(pc['x']).ravel(), |
| 77 | + np.array(pc['y']).ravel(), |
| 78 | + np.array(pc['z']).ravel()), axis=1) |
| 79 | + # Apply filtering (for example, x > 0 and z in a specified range) |
| 80 | + mask = (pts[:, 0] > -0.5) & (pts[:, 2] < -1) & (pts[:, 2] > -2.7) |
| 81 | + return pts[mask] |
| 82 | + |
| 83 | + |
| 84 | +def backproject_pixel(u, v, K): |
| 85 | + """ |
| 86 | + Backprojects a pixel coordinate (u, v) into a normalized 3D ray in the camera coordinate system. |
| 87 | + """ |
| 88 | + cx, cy = K[0, 2], K[1, 2] |
| 89 | + fx, fy = K[0, 0], K[1, 1] |
| 90 | + x = (u - cx) / fx |
| 91 | + y = (v - cy) / fy |
| 92 | + ray_dir = np.array([x, y, 1.0]) |
| 93 | + return ray_dir / np.linalg.norm(ray_dir) |
| 94 | + |
| 95 | + |
| 96 | +def find_human_center_on_ray(lidar_pc, ray_origin, ray_direction, |
| 97 | + t_min, t_max, t_step, |
| 98 | + distance_threshold, min_points, ransac_threshold): |
| 99 | + """ |
| 100 | + Identify the center of a human along a projected ray. |
| 101 | + (This function is no longer used in the new approach.) |
| 102 | + """ |
| 103 | + return None, None, None |
| 104 | + |
| 105 | + |
| 106 | +def extract_roi(pc, center, roi_radius): |
| 107 | + """ |
| 108 | + Extract points from a point cloud that lie within a specified radius of a center point. |
| 109 | + """ |
| 110 | + distances = np.linalg.norm(pc - center, axis=1) |
| 111 | + return pc[distances < roi_radius] |
| 112 | + |
| 113 | + |
| 114 | +def refine_cluster(roi_points, center, eps=0.2, min_samples=10): |
| 115 | + """ |
| 116 | + Refine a point cluster by applying DBSCAN and return the cluster closest to 'center'. |
| 117 | + """ |
| 118 | + if roi_points.shape[0] < min_samples: |
| 119 | + return roi_points |
| 120 | + clustering = DBSCAN(eps=eps, min_samples=min_samples).fit(roi_points) |
| 121 | + labels = clustering.labels_ |
| 122 | + valid_clusters = [roi_points[labels == l] for l in set(labels) if l != -1] |
| 123 | + if not valid_clusters: |
| 124 | + return roi_points |
| 125 | + best_cluster = min(valid_clusters, key=lambda c: np.linalg.norm(np.mean(c, axis=0) - center)) |
| 126 | + return best_cluster |
| 127 | + |
| 128 | + |
| 129 | +def remove_ground_by_min_range(cluster, z_range=0.05): |
| 130 | + """ |
| 131 | + Remove points within z_range of the minimum z (assumed to be ground). |
| 132 | + """ |
| 133 | + if cluster is None or cluster.shape[0] == 0: |
| 134 | + return cluster |
| 135 | + min_z = np.min(cluster[:, 2]) |
| 136 | + filtered = cluster[cluster[:, 2] > (min_z + z_range)] |
| 137 | + return filtered |
| 138 | + |
| 139 | + |
| 140 | +def get_bounding_box_center_and_dimensions(points): |
| 141 | + """ |
| 142 | + Calculate the axis-aligned bounding box's center and dimensions for a set of 3D points. |
| 143 | + """ |
| 144 | + if points.shape[0] == 0: |
| 145 | + return None, None |
| 146 | + min_vals = np.min(points, axis=0) |
| 147 | + max_vals = np.max(points, axis=0) |
| 148 | + center = (min_vals + max_vals) / 2 |
| 149 | + dimensions = max_vals - min_vals |
| 150 | + return center, dimensions |
| 151 | + |
| 152 | + |
| 153 | +def create_ray_line_set(start, end): |
| 154 | + """ |
| 155 | + Create an Open3D LineSet object representing a ray between two 3D points. |
| 156 | + The line is colored yellow. |
| 157 | + """ |
| 158 | + points = [start, end] |
| 159 | + lines = [[0, 1]] |
| 160 | + line_set = o3d.geometry.LineSet() |
| 161 | + line_set.points = o3d.utility.Vector3dVector(points) |
| 162 | + line_set.lines = o3d.utility.Vector2iVector(lines) |
| 163 | + line_set.colors = o3d.utility.Vector3dVector([[1, 1, 0]]) |
| 164 | + return line_set |
| 165 | + |
| 166 | + |
| 167 | +def downsample_points(lidar_points, voxel_size=0.15): |
| 168 | + pcd = o3d.geometry.PointCloud() |
| 169 | + pcd.points = o3d.utility.Vector3dVector(lidar_points) |
| 170 | + down_pcd = pcd.voxel_down_sample(voxel_size=voxel_size) |
| 171 | + return np.asarray(down_pcd.points) |
| 172 | + |
| 173 | + |
| 174 | +def filter_depth_points(lidar_points, max_depth_diff=0.9, use_norm=True): |
| 175 | + if lidar_points.shape[0] == 0: |
| 176 | + return lidar_points |
| 177 | + |
| 178 | + if use_norm: |
| 179 | + depths = np.linalg.norm(lidar_points, axis=1) |
| 180 | + else: |
| 181 | + depths = lidar_points[:, 0] |
| 182 | + |
| 183 | + min_depth = np.min(depths) |
| 184 | + max_possible_depth = min_depth + max_depth_diff |
| 185 | + mask = depths < max_possible_depth |
| 186 | + return lidar_points[mask] |
| 187 | + |
| 188 | + |
| 189 | +def visualize_geometries(geometries, window_name="Open3D", width=800, height=600, point_size=5.0): |
| 190 | + """ |
| 191 | + Visualize a list of Open3D geometry objects in a dedicated window. |
| 192 | + """ |
| 193 | + vis = o3d.visualization.Visualizer() |
| 194 | + vis.create_window(window_name=window_name, width=width, height=height) |
| 195 | + for geom in geometries: |
| 196 | + vis.add_geometry(geom) |
| 197 | + opt = vis.get_render_option() |
| 198 | + opt.point_size = point_size |
| 199 | + vis.run() |
| 200 | + vis.destroy_window() |
| 201 | + |
| 202 | +def transform_points_l2c(lidar_points, T_l2c): |
| 203 | + N = lidar_points.shape[0] |
| 204 | + pts_hom = np.hstack((lidar_points, np.ones((N, 1)))) # (N,4) |
| 205 | + pts_cam = (T_l2c @ pts_hom.T).T # (N,4) |
| 206 | + return pts_cam[:, :3] |
| 207 | + |
| 208 | + |
| 209 | +# ----- New: Vectorized projection function ----- |
| 210 | +def project_points(pts_cam, K, original_lidar_points): |
| 211 | + """ |
| 212 | + Vectorized version. |
| 213 | + pts_cam: (N,3) array of points in camera coordinates. |
| 214 | + original_lidar_points: (N,3) array of points in LiDAR coordinates. |
| 215 | + Returns a (M,5) array: [u, v, X_lidar, Y_lidar, Z_lidar] for all points with Z>0. |
| 216 | + """ |
| 217 | + mask = pts_cam[:, 2] > 0 |
| 218 | + pts_cam_valid = pts_cam[mask] |
| 219 | + lidar_valid = original_lidar_points[mask] |
| 220 | + Xc = pts_cam_valid[:, 0] |
| 221 | + Yc = pts_cam_valid[:, 1] |
| 222 | + Zc = pts_cam_valid[:, 2] |
| 223 | + u = (K[0, 0] * (Xc / Zc) + K[0, 2]).astype(np.int32) |
| 224 | + v = (K[1, 1] * (Yc / Zc) + K[1, 2]).astype(np.int32) |
| 225 | + proj = np.column_stack((u, v, lidar_valid)) |
| 226 | + return proj |
0 commit comments