|
1 | | -from ...state import AllState,VehicleState,ObjectPose,ObjectFrameEnum,AgentState,AgentEnum,AgentActivityEnum |
| 1 | +from ...state import AllState, VehicleState, ObjectPose, ObjectFrameEnum, AgentState, AgentEnum, AgentActivityEnum |
2 | 2 | from ..interface.gem import GEMInterface |
3 | 3 | from ..component import Component |
4 | 4 | from ultralytics import YOLO |
5 | 5 | import cv2 |
6 | 6 | from typing import Dict |
| 7 | +import open3d as o3d |
| 8 | +import numpy as np |
| 9 | +from sklearn.cluster import DBSCAN |
| 10 | +from scipy.spatial.transform import Rotation as R |
| 11 | + |
| 12 | +def backproject_pixel(u, v, K): |
| 13 | + """Backprojects pixel (u,v) into a normalized 3D ray (camera coordinates).""" |
| 14 | + cx, cy = K[0, 2], K[1, 2] |
| 15 | + fx, fy = K[0, 0], K[1, 1] |
| 16 | + x = (u - cx) / fx |
| 17 | + y = (v - cy) / fy |
| 18 | + ray_dir = np.array([x, y, 1.0]) |
| 19 | + return ray_dir / np.linalg.norm(ray_dir) |
| 20 | + |
| 21 | +def find_human_center_on_ray(lidar_pc, ray_origin, ray_direction, |
| 22 | + t_min, t_max, t_step, |
| 23 | + distance_threshold, min_points, ransac_threshold): |
| 24 | + """ |
| 25 | + Sweep along the ray and return the first candidate point where a sufficient number |
| 26 | + of LiDAR points are found. (For simplicity, plane fitting is omitted.) |
| 27 | + """ |
| 28 | + t_values = np.arange(t_min, t_max, t_step) |
| 29 | + for t in t_values: |
| 30 | + candidate = ray_origin + t * ray_direction |
| 31 | + dists = np.linalg.norm(lidar_pc - candidate, axis=1) |
| 32 | + if np.sum(dists < distance_threshold) >= min_points: |
| 33 | + return candidate, None, None |
| 34 | + return None, None, None |
| 35 | + |
| 36 | +def extract_roi(pc, center, roi_radius): |
| 37 | + """Extract points from pc that lie within roi_radius of center.""" |
| 38 | + distances = np.linalg.norm(pc - center, axis=1) |
| 39 | + return pc[distances < roi_radius] |
| 40 | + |
| 41 | +def refine_cluster(roi_points, center, eps=0.2, min_samples=10): |
| 42 | + """Refine a cluster using DBSCAN and return the cluster closest to center.""" |
| 43 | + clustering = DBSCAN(eps=eps, min_samples=min_samples).fit(roi_points) |
| 44 | + labels = clustering.labels_ |
| 45 | + valid_clusters = [roi_points[labels == l] for l in set(labels) if l != -1] |
| 46 | + if not valid_clusters: |
| 47 | + return roi_points |
| 48 | + best_cluster = min(valid_clusters, key=lambda c: np.linalg.norm(np.mean(c, axis=0) - center)) |
| 49 | + return best_cluster |
| 50 | + |
| 51 | +def remove_ground_by_min_range(cluster, z_range=0.05): |
| 52 | + """ |
| 53 | + Remove ground points by computing the minimum z value in the cluster and eliminating |
| 54 | + all points with z within z_range of that minimum. |
| 55 | + """ |
| 56 | + if cluster is None or cluster.shape[0] == 0: |
| 57 | + return cluster |
| 58 | + min_z = np.min(cluster[:, 2]) |
| 59 | + filtered = cluster[cluster[:, 2] > (min_z + z_range)] |
| 60 | + return filtered |
7 | 61 |
|
8 | 62 | def box_to_fake_agent(box): |
9 | 63 | """Creates a fake agent state from an (x,y,w,h) bounding box. |
@@ -34,31 +88,71 @@ def state_outputs(self): |
34 | 88 |
|
35 | 89 | def initialize(self): |
36 | 90 | #tell the vehicle to use image_callback whenever 'front_camera' gets a reading, and it expects images of type cv2.Mat |
37 | | - self.detector = YOLO('../../knowledge/detection/yolov11n.pt') |
| 91 | + self.detector = YOLO('../../knowledge/detection/yolov8n.pt') |
38 | 92 | self.vehicle_interface.subscribe_sensor('front_camera', self.image_callback, cv2.Mat) |
| 93 | + # Set up camera intrinsics and LiDAR-to-camera transformation. |
| 94 | + self.K = np.array([[684.83331299, 0., 573.37109375], |
| 95 | + [0., 684.60968018, 363.70092773], |
| 96 | + [0., 0., 1.]]) |
| 97 | + self.T_l2c = np.array([[-0.01909581, -0.9997844, 0.0081547, 0.24521313], |
| 98 | + [0.06526397, -0.00938524, -0.9978239, -0.80389025], |
| 99 | + [0.9976853, -0.01852205, 0.06542912, -0.6605772], |
| 100 | + [0., 0., 0., 1.]]) |
| 101 | + self.T_c2l = np.linalg.inv(self.T_l2c) |
| 102 | + self.R_c2l = self.T_c2l[:3, :3] |
| 103 | + self.camera_origin_in_lidar = self.T_c2l[:3, 3] |
39 | 104 |
|
40 | 105 | def image_callback(self, image: cv2.Mat): |
41 | 106 | """Image processing callback with original detection logic""" |
42 | | - results = self.detector(image, conf=0.5) |
43 | | - boxes = results[0].boxes |
| 107 | + results = self.detector(image, conf=0.5, classes = [0]) |
| 108 | + boxes = results[0].boxes.xywh.cpu().tolist() # Format: [center_x, center_y, w, h] |
| 109 | + self.last_person_boxes = boxes |
44 | 110 |
|
45 | | - if len(boxes) == 0: |
46 | | - self.last_person_boxes = [] |
47 | | - return |
48 | | - cls_ids = boxes.cls.cpu() |
49 | | - xywh = boxes.xywh.cpu() |
50 | | - person_mask = (cls_ids == 0) |
51 | | - |
52 | | - self.last_person_boxes = [tuple(map(float, box)) for box in xywh[person_mask]] |
53 | | - |
54 | | - def update(self, vehicle : VehicleState) -> Dict[str,AgentState]: |
55 | | - res = {} |
56 | | - for i,b in enumerate(self.last_person_boxes): |
57 | | - x,y,w,h = b |
58 | | - res['pedestrian'+str(i)] = box_to_fake_agent(b) |
59 | | - if len(res) > 0: |
60 | | - print("Detected",len(res),"pedestrians") |
61 | | - return res |
| 111 | + def update(self, vehicle: VehicleState) -> dict: |
| 112 | + agents = {} |
| 113 | + # Assume LiDAR point cloud is available in vehicle.lidar as a (N,3) np.ndarray. |
| 114 | + lidar_pc = vehicle.lidar |
| 115 | + for i, box in enumerate(self.last_person_boxes): |
| 116 | + cx, cy, w, h = box |
| 117 | + # Backproject the center pixel into a ray. |
| 118 | + ray_dir_cam = backproject_pixel(cx, cy, self.K) |
| 119 | + ray_dir_lidar = self.R_c2l @ ray_dir_cam |
| 120 | + ray_dir_lidar /= np.linalg.norm(ray_dir_lidar) |
| 121 | + # Sweep along the ray to get an initial 3D candidate. |
| 122 | + intersection, _, _ = find_human_center_on_ray(lidar_pc, self.camera_origin_in_lidar, ray_dir_lidar, |
| 123 | + t_min=0.5, t_max=20.0, t_step=0.1, |
| 124 | + distance_threshold=0.2, min_points=20, ransac_threshold=0.05) |
| 125 | + if intersection is None: |
| 126 | + continue |
| 127 | + # Extract an ROI around the candidate and refine it. |
| 128 | + roi_points = extract_roi(lidar_pc, intersection, roi_radius=1.0) |
| 129 | + if roi_points.shape[0] < 20: |
| 130 | + refined_cluster = roi_points |
| 131 | + else: |
| 132 | + refined_cluster = refine_cluster(roi_points, intersection, eps=0.15, min_samples=10) |
| 133 | + refined_cluster = remove_ground_by_min_range(refined_cluster, z_range=0.05) |
| 134 | + if refined_cluster is None or refined_cluster.shape[0] == 0: |
| 135 | + refined_center = intersection |
| 136 | + dims = (0, 0, 0) |
| 137 | + yaw, pitch, roll = 0, 0, 0 |
| 138 | + else: |
| 139 | + # Compute oriented bounding box from the refined cluster. |
| 140 | + pcd = o3d.geometry.PointCloud() |
| 141 | + pcd.points = o3d.utility.Vector3dVector(refined_cluster) |
| 142 | + obb = pcd.get_oriented_bounding_box() |
| 143 | + refined_center = obb.center |
| 144 | + dims = tuple(obb.extent) |
| 145 | + euler_angles = R.from_matrix(obb.R.copy()).as_euler('zyx', degrees=True) |
| 146 | + yaw, pitch, roll = euler_angles[0], euler_angles[1], euler_angles[2] |
| 147 | + # Create the agent pose and state. |
| 148 | + pose = ObjectPose(t=vehicle.time, x=refined_center[0], y=refined_center[1], |
| 149 | + z=refined_center[2], yaw=yaw, pitch=pitch, roll=roll, |
| 150 | + frame=ObjectFrameEnum.CURRENT) |
| 151 | + agent_state = AgentState(pose=pose, dimensions=dims, outline=None, |
| 152 | + type=AgentEnum.PEDESTRIAN, activity=AgentActivityEnum.MOVING, |
| 153 | + velocity=(0, 0, 0), yaw_rate=0) |
| 154 | + agents[f'pedestrian{i}'] = agent_state |
| 155 | + return agents |
62 | 156 |
|
63 | 157 |
|
64 | 158 | class FakePedestrianDetector2D(Component): |
|
0 commit comments