|
2 | 2 | from ..interface.gem import GEMInterface |
3 | 3 | from ..component import Component |
4 | 4 | from ultralytics import YOLO |
5 | | -import cv2 |
6 | 5 | from typing import Dict |
7 | | -import open3d as o3d |
8 | | -import numpy as np |
9 | 6 | from sklearn.cluster import DBSCAN |
10 | 7 | from scipy.spatial.transform import Rotation as R |
11 | 8 | import rospy |
12 | 9 | from sensor_msgs.msg import PointCloud2, Image |
13 | | -import sensor_msgs.point_cloud2 as pc2 |
14 | | -import struct, ctypes |
15 | 10 | from message_filters import Subscriber, ApproximateTimeSynchronizer |
16 | 11 | from cv_bridge import CvBridge |
17 | 12 | import time |
18 | | -import math |
19 | | -import ros_numpy |
20 | | - |
21 | | - |
22 | | -# ----- Helper Functions ----- |
23 | | - |
24 | | -def match_existing_pedestrian( |
25 | | - new_center: np.ndarray, |
26 | | - new_dims: tuple, |
27 | | - existing_agents: Dict[str, AgentState], |
28 | | - distance_threshold: float = 1.0 |
29 | | -) -> str: |
30 | | - """ |
31 | | - Find the closest existing pedestrian agent within a specified distance threshold. |
32 | | - """ |
33 | | - best_agent_id = None |
34 | | - best_dist = float('inf') |
35 | | - for agent_id, agent_state in existing_agents.items(): |
36 | | - old_center = np.array([agent_state.pose.x, agent_state.pose.y, agent_state.pose.z]) |
37 | | - dist = np.linalg.norm(new_center - old_center) |
38 | | - if dist < distance_threshold and dist < best_dist: |
39 | | - best_dist = dist |
40 | | - best_agent_id = agent_id |
41 | | - return best_agent_id |
42 | | - |
43 | | - |
44 | | -def compute_velocity(old_pose: ObjectPose, new_pose: ObjectPose, dt: float) -> tuple: |
45 | | - """ |
46 | | - Compute the (vx, vy, vz) velocity based on change in pose over time. |
47 | | - """ |
48 | | - if dt <= 0: |
49 | | - return (0, 0, 0) |
50 | | - vx = (new_pose.x - old_pose.x) / dt |
51 | | - vy = (new_pose.y - old_pose.y) / dt |
52 | | - vz = (new_pose.z - old_pose.z) / dt |
53 | | - return (vx, vy, vz) |
54 | | - |
55 | | - |
56 | | -def extract_roi_box(lidar_pc, center, half_extents): |
57 | | - """ |
58 | | - Extract a region of interest (ROI) from the LiDAR point cloud defined by an axis-aligned bounding box. |
59 | | - """ |
60 | | - lower = center - half_extents |
61 | | - upper = center + half_extents |
62 | | - mask = np.all((lidar_pc >= lower) & (lidar_pc <= upper), axis=1) |
63 | | - return lidar_pc[mask] |
64 | | - |
65 | | - |
66 | | -# def pc2_to_numpy(pc2_msg, want_rgb=False): |
67 | | -# """ |
68 | | -# Convert a ROS PointCloud2 message into a numpy array. |
69 | | -# This function extracts the x, y, z coordinates from the point cloud. |
70 | | -# """ |
71 | | -# start = time.time() |
72 | | -# gen = pc2.read_points(pc2_msg, skip_nans=True) |
73 | | -# end = time.time() |
74 | | -# print('Read lidar points: ', end - start) |
75 | | -# start = time.time() |
76 | | -# pts = np.array(list(gen), dtype=np.float16) |
77 | | -# pts = pts[:, :3] # Only x, y, z coordinates |
78 | | -# mask = (pts[:, 0] > 0) & (pts[:, 2] < 2.5) |
79 | | -# end = time.time() |
80 | | -# print('Convert to numpy: ', end - start) |
81 | | -# return pts[mask] |
82 | | - |
83 | | -def pc2_to_numpy(pc2_msg, want_rgb=False): |
84 | | - """ |
85 | | - Convert a ROS PointCloud2 message into a numpy array quickly using ros_numpy. |
86 | | - This function extracts the x, y, z coordinates from the point cloud. |
87 | | - """ |
88 | | - # Convert the ROS message to a numpy structured array |
89 | | - pc = ros_numpy.point_cloud2.pointcloud2_to_array(pc2_msg) |
90 | | - # Convert each field to a 1D array and stack along axis 1 to get (N, 3) |
91 | | - pts = np.stack((np.array(pc['x']).ravel(), |
92 | | - np.array(pc['y']).ravel(), |
93 | | - np.array(pc['z']).ravel()), axis=1) |
94 | | - # Apply filtering (for example, x > 0 and z < 2.5) |
95 | | - mask = (pts[:, 0] > 0) & (pts[:, 2] < 2.5) |
96 | | - return pts[mask] |
97 | | - |
98 | | - |
99 | | - |
100 | | -def backproject_pixel(u, v, K): |
101 | | - """ |
102 | | - Backprojects a pixel coordinate (u, v) into a normalized 3D ray in the camera coordinate system. |
103 | | - """ |
104 | | - cx, cy = K[0, 2], K[1, 2] |
105 | | - fx, fy = K[0, 0], K[1, 1] |
106 | | - x = (u - cx) / fx |
107 | | - y = (v - cy) / fy |
108 | | - ray_dir = np.array([x, y, 1.0]) |
109 | | - return ray_dir / np.linalg.norm(ray_dir) |
110 | | - |
111 | | - |
112 | | -def find_human_center_on_ray(lidar_pc, ray_origin, ray_direction, |
113 | | - t_min, t_max, t_step, |
114 | | - distance_threshold, min_points, ransac_threshold): |
115 | | - """ |
116 | | - Identify the center of a human along a projected ray. |
117 | | - (This function is no longer used in the new approach.) |
118 | | - """ |
119 | | - return None, None, None |
120 | | - |
121 | | - |
122 | | -def extract_roi(pc, center, roi_radius): |
123 | | - """ |
124 | | - Extract points from a point cloud that lie within a specified radius of a center point. |
125 | | - """ |
126 | | - distances = np.linalg.norm(pc - center, axis=1) |
127 | | - return pc[distances < roi_radius] |
128 | | - |
129 | | - |
130 | | -def refine_cluster(roi_points, center, eps=0.2, min_samples=10): |
131 | | - """ |
132 | | - Refine a point cluster by applying DBSCAN and return the cluster closest to 'center'. |
133 | | - """ |
134 | | - if roi_points.shape[0] < min_samples: |
135 | | - return roi_points |
136 | | - clustering = DBSCAN(eps=eps, min_samples=min_samples).fit(roi_points) |
137 | | - labels = clustering.labels_ |
138 | | - valid_clusters = [roi_points[labels == l] for l in set(labels) if l != -1] |
139 | | - if not valid_clusters: |
140 | | - return roi_points |
141 | | - best_cluster = min(valid_clusters, key=lambda c: np.linalg.norm(np.mean(c, axis=0) - center)) |
142 | | - return best_cluster |
143 | | - |
144 | | - |
145 | | -def remove_ground_by_min_range(cluster, z_range=0.05): |
146 | | - """ |
147 | | - Remove points within z_range of the minimum z (assumed to be ground). |
148 | | - """ |
149 | | - if cluster is None or cluster.shape[0] == 0: |
150 | | - return cluster |
151 | | - min_z = np.min(cluster[:, 2]) |
152 | | - filtered = cluster[cluster[:, 2] > (min_z + z_range)] |
153 | | - return filtered |
154 | | - |
155 | | - |
156 | | -def get_bounding_box_center_and_dimensions(points): |
157 | | - """ |
158 | | - Calculate the axis-aligned bounding box's center and dimensions for a set of 3D points. |
159 | | - """ |
160 | | - if points.shape[0] == 0: |
161 | | - return None, None |
162 | | - min_vals = np.min(points, axis=0) |
163 | | - max_vals = np.max(points, axis=0) |
164 | | - center = (min_vals + max_vals) / 2 |
165 | | - dimensions = max_vals - min_vals |
166 | | - return center, dimensions |
167 | | - |
168 | | - |
169 | | -def create_ray_line_set(start, end): |
170 | | - """ |
171 | | - Create an Open3D LineSet object representing a ray between two 3D points. |
172 | | - The line is colored yellow. |
173 | | - """ |
174 | | - points = [start, end] |
175 | | - lines = [[0, 1]] |
176 | | - line_set = o3d.geometry.LineSet() |
177 | | - line_set.points = o3d.utility.Vector3dVector(points) |
178 | | - line_set.lines = o3d.utility.Vector2iVector(lines) |
179 | | - line_set.colors = o3d.utility.Vector3dVector([[1, 1, 0]]) |
180 | | - return line_set |
181 | | - |
182 | | -def downsample_points(lidar_points, voxel_size=0.15): |
183 | | - pcd = o3d.geometry.PointCloud() |
184 | | - pcd.points = o3d.utility.Vector3dVector(lidar_points) |
185 | | - down_pcd = pcd.voxel_down_sample(voxel_size=voxel_size) |
186 | | - return np.asarray(down_pcd.points) |
187 | | - |
188 | | -def filter_depth_points(lidar_points, max_human_depth=0.9): |
189 | | - |
190 | | - if lidar_points.shape[0] == 0: |
191 | | - return lidar_points |
192 | | - lidar_points_dist = lidar_points[:, 0] |
193 | | - min_dist = np.min(lidar_points_dist) |
194 | | - max_possible_dist = min_dist + max_human_depth |
195 | | - filtered_array = lidar_points[lidar_points_dist < max_possible_dist] |
196 | | - return filtered_array |
197 | | - |
198 | | -def visualize_geometries(geometries, window_name="Open3D", width=800, height=600, point_size=5.0): |
199 | | - """ |
200 | | - Visualize a list of Open3D geometry objects in a dedicated window. |
201 | | - """ |
202 | | - vis = o3d.visualization.Visualizer() |
203 | | - vis.create_window(window_name=window_name, width=width, height=height) |
204 | | - for geom in geometries: |
205 | | - vis.add_geometry(geom) |
206 | | - opt = vis.get_render_option() |
207 | | - opt.point_size = point_size |
208 | | - vis.run() |
209 | | - vis.destroy_window() |
210 | | - |
211 | | -def pose_to_matrix(pose): |
212 | | - """ |
213 | | - Compose a 4x4 transformation matrix from a pose state. |
214 | | - Assumes pose has attributes: x, y, z, yaw, pitch, roll, |
215 | | - where the angles are given in degrees. |
216 | | - """ |
217 | | - # Use default values if any are None (e.g. if the car is not moving) |
218 | | - x = pose.x if pose.x is not None else 0.0 |
219 | | - y = pose.y if pose.y is not None else 0.0 |
220 | | - z = pose.z if pose.z is not None else 0.0 |
221 | | - if pose.yaw is not None and pose.pitch is not None and pose.roll is not None: |
222 | | - yaw = math.radians(pose.yaw) |
223 | | - pitch = math.radians(pose.pitch) |
224 | | - roll = math.radians(pose.roll) |
225 | | - else: |
226 | | - yaw = 0.0 |
227 | | - pitch = 0.0 |
228 | | - roll = 0.0 |
229 | | - R_mat = R.from_euler('zyx', [yaw, pitch, roll]).as_matrix() |
230 | | - T = np.eye(4) |
231 | | - T[:3, :3] = R_mat |
232 | | - T[:3, 3] = np.array([x, y, z]) |
233 | | - return T |
234 | | - |
235 | | - |
236 | | -def transform_points_l2c(lidar_points, T_l2c): |
237 | | - N = lidar_points.shape[0] |
238 | | - pts_hom = np.hstack((lidar_points, np.ones((N, 1)))) # (N,4) |
239 | | - pts_cam = (T_l2c @ pts_hom.T).T # (N,4) |
240 | | - return pts_cam[:, :3] |
241 | | - |
242 | | -# ----- New: Vectorized projection function ----- |
243 | | -def project_points(pts_cam, K, original_lidar_points): |
244 | | - """ |
245 | | - Vectorized version. |
246 | | - pts_cam: (N,3) array of points in camera coordinates. |
247 | | - original_lidar_points: (N,3) array of points in LiDAR coordinates. |
248 | | - Returns a (M,5) array: [u, v, X_lidar, Y_lidar, Z_lidar] for all points with Z>0. |
249 | | - """ |
250 | | - mask = pts_cam[:, 2] > 0 |
251 | | - pts_cam_valid = pts_cam[mask] |
252 | | - lidar_valid = original_lidar_points[mask] |
253 | | - Xc = pts_cam_valid[:, 0] |
254 | | - Yc = pts_cam_valid[:, 1] |
255 | | - Zc = pts_cam_valid[:, 2] |
256 | | - u = (K[0, 0] * (Xc / Zc) + K[0, 2]).astype(np.int32) |
257 | | - v = (K[1, 1] * (Yc / Zc) + K[1, 2]).astype(np.int32) |
258 | | - proj = np.column_stack((u, v, lidar_valid)) |
259 | | - return proj |
| 13 | +from .pedestrian_utils import * # Import the moved helper functions |
| 14 | +from .pedestrian_utils_gem import * # Import the moved GEM related helper functions |
260 | 15 |
|
261 | 16 |
|
262 | 17 | # ----- Pedestrian Detector 2D (New Approach) ----- |
@@ -302,7 +57,8 @@ def initialize(self): |
302 | 57 | self.sync = ApproximateTimeSynchronizer([self.rgb_sub, self.lidar_sub], |
303 | 58 | queue_size=10, slop=0.1) |
304 | 59 | self.sync.registerCallback(self.synchronized_callback) |
305 | | - self.detector = YOLO('../../knowledge/detection/yolov8s.pt') |
| 60 | + # self.detector = YOLO('../../knowledge/detection/yolov8s.pt') |
| 61 | + self.detector = YOLO('GEMstack/knowledge/detection/yolov8s.pt') |
306 | 62 | self.detector.to('cuda') |
307 | 63 | self.K = np.array([[684.83331299, 0., 573.37109375], |
308 | 64 | [0., 684.60968018, 363.70092773], |
|
0 commit comments