|
1 | | -from ...state import AllState,VehicleState,ObjectPose,ObjectFrameEnum,AgentState,AgentEnum,AgentActivityEnum |
| 1 | +from ...state import AllState, VehicleState, ObjectPose, ObjectFrameEnum, AgentState, AgentEnum, AgentActivityEnum |
2 | 2 | from ..interface.gem import GEMInterface |
3 | 3 | from ..component import Component |
4 | 4 | from ultralytics import YOLO |
5 | 5 | import cv2 |
| 6 | +import numpy as np |
6 | 7 | from typing import Dict |
7 | 8 |
|
| 9 | +# ============================================================================= |
| 10 | +# Global camera intrinsics (placeholder values – move these to your settings file later) |
| 11 | +# The camera frame is assumed to be: |
| 12 | +# - x: right, y: down, z: forward |
| 13 | +# These values should be obtained via calibration (e.g., from the /oak/rgb/camera_info topic) |
| 14 | +# ============================================================================= |
| 15 | +CAMERA_INTRINSICS = { |
| 16 | + 'fx': 500.0, # focal length in pixels (replace with your calibrated value) |
| 17 | + 'fy': 500.0, |
| 18 | + 'cx': 320.0, # principal point x (replace with your calibrated value) |
| 19 | + 'cy': 240.0, # principal point y (replace with your calibrated value) |
| 20 | +} |
| 21 | + |
| 22 | +# ============================================================================= |
| 23 | +# LIDAR to Camera Transform (placeholder, replace with your actual calibration matrix) |
| 24 | +# This 4x4 homogeneous transformation converts points in the lidar frame to the camera frame. |
| 25 | +# Replace the identity matrix below with your actual calibration data. |
| 26 | +# ============================================================================= |
| 27 | +LIDAR_TO_CAMERA_TRANSFORM = np.eye(4) # TODO: replace with your calibrated transform |
| 28 | + |
| 29 | + |
8 | 30 | def box_to_fake_agent(box): |
9 | 31 | """Creates a fake agent state from an (x,y,w,h) bounding box. |
10 | 32 | |
11 | | - The location and size are pretty much meaningless since this is just giving a 2D location. |
| 33 | + The location and size are placeholders, since this is just a 2D approximation. |
| 34 | + """ |
| 35 | + x, y, w, h = box |
| 36 | + pose = ObjectPose(t=0, x=x + w / 2, y=y + h / 2, z=0, |
| 37 | + yaw=0, pitch=0, roll=0, frame=ObjectFrameEnum.CURRENT) |
| 38 | + dims = (w, h, 0) |
| 39 | + return AgentState(pose=pose, dimensions=dims, outline=None, type=AgentEnum.PEDESTRIAN, |
| 40 | + activity=AgentActivityEnum.MOVING, velocity=(0, 0, 0), yaw_rate=0) |
| 41 | + |
| 42 | + |
| 43 | +def get_average_cam_point_from_lidar(bbox, point_cloud): |
| 44 | + """ |
| 45 | + Given a bounding box (x, y, w, h) in image coordinates and a lidar point cloud (in the lidar frame), |
| 46 | + this function: |
| 47 | + 1. Converts the point cloud to homogeneous coordinates. |
| 48 | + 2. Transforms the points into the camera frame using LIDAR_TO_CAMERA_TRANSFORM. |
| 49 | + 3. Projects the points into the image using the camera intrinsics. |
| 50 | + 4. Selects points that fall inside the bounding box. |
| 51 | + 5. Returns the average 3D point (in the camera frame) of those points. |
| 52 | + |
| 53 | + Returns: |
| 54 | + avg_point: A NumPy array [x_cam, y_cam, z_cam] in the camera frame, |
| 55 | + or None if no points are found inside the bbox. |
| 56 | + """ |
| 57 | + x, y, w, h = bbox |
| 58 | + if point_cloud is None or point_cloud.shape[0] == 0: |
| 59 | + return None |
| 60 | + |
| 61 | + # Convert the point cloud (assumed shape (N,3)) to homogeneous coordinates (N,4) |
| 62 | + ones = np.ones((point_cloud.shape[0], 1)) |
| 63 | + points_homog = np.hstack((point_cloud, ones)) # shape (N,4) |
| 64 | + |
| 65 | + # Transform points from lidar frame to camera frame |
| 66 | + cam_points_homog = (LIDAR_TO_CAMERA_TRANSFORM @ points_homog.T).T # shape (N,4) |
| 67 | + cam_points = cam_points_homog[:, :3] # (x_cam, y_cam, z_cam) |
| 68 | + |
| 69 | + # Only consider points in front of the camera (z_cam > 0) |
| 70 | + valid_mask = cam_points[:, 2] > 0 |
| 71 | + cam_points = cam_points[valid_mask] |
| 72 | + if cam_points.shape[0] == 0: |
| 73 | + return None |
| 74 | + |
| 75 | + # Project points into the image using the pinhole camera model: |
| 76 | + # u = fx * (x_cam / z_cam) + cx, v = fy * (y_cam / z_cam) + cy |
| 77 | + u = CAMERA_INTRINSICS['fx'] * (cam_points[:, 0] / cam_points[:, 2]) + CAMERA_INTRINSICS['cx'] |
| 78 | + v = CAMERA_INTRINSICS['fy'] * (cam_points[:, 1] / cam_points[:, 2]) + CAMERA_INTRINSICS['cy'] |
| 79 | + |
| 80 | + # Create a mask for points that fall within the bounding box. |
| 81 | + in_bbox_mask = (u >= x) & (u <= x + w) & (v >= y) & (v <= y + h) |
| 82 | + if np.sum(in_bbox_mask) == 0: |
| 83 | + return None |
| 84 | + |
| 85 | + selected_points = cam_points[in_bbox_mask] |
| 86 | + # Compute the average of the selected points (mean of x, y, and z) |
| 87 | + avg_point = np.mean(selected_points, axis=0) |
| 88 | + return avg_point |
| 89 | + |
| 90 | + |
| 91 | +def camera_to_vehicle(cam_point): |
| 92 | + """ |
| 93 | + Convert a point from the camera frame (x_cam, y_cam, z_cam) to the vehicle frame. |
| 94 | + Assumptions: |
| 95 | + - Camera frame: x to the right, y down, z forward. |
| 96 | + - Vehicle frame: x forward, y to the left, z up. |
| 97 | + |
| 98 | + The conversion is: |
| 99 | + x_vehicle = z_cam |
| 100 | + y_vehicle = -x_cam |
| 101 | + z_vehicle = -y_cam |
| 102 | + """ |
| 103 | + x_cam, y_cam, z_cam = cam_point |
| 104 | + x_vehicle = z_cam |
| 105 | + y_vehicle = -x_cam |
| 106 | + z_vehicle = -y_cam |
| 107 | + return np.array([x_vehicle, y_vehicle, z_vehicle]) |
| 108 | + |
| 109 | + |
| 110 | +def box_to_agent_with_lidar(bbox, point_cloud): |
12 | 111 | """ |
13 | | - x,y,w,h = box |
14 | | - pose = ObjectPose(t=0,x=x+w/2,y=y+h/2,z=0,yaw=0,pitch=0,roll=0,frame=ObjectFrameEnum.CURRENT) |
15 | | - dims = (w,h,0) |
16 | | - return AgentState(pose=pose,dimensions=dims,outline=None,type=AgentEnum.PEDESTRIAN,activity=AgentActivityEnum.MOVING,velocity=(0,0,0),yaw_rate=0) |
| 112 | + Given a bounding box and a lidar point cloud, use the lidar data to estimate the 3D position |
| 113 | + of the detected pedestrian. |
| 114 | + """ |
| 115 | + avg_cam_point = get_average_cam_point_from_lidar(bbox, point_cloud) |
| 116 | + if avg_cam_point is None: |
| 117 | + # If no lidar points fall within the bounding box, fall back to a 2D-only approximation. |
| 118 | + return box_to_fake_agent(bbox) |
| 119 | + |
| 120 | + # Convert the averaged 3D point (in the camera frame) to the vehicle frame. |
| 121 | + vehicle_point = camera_to_vehicle(avg_cam_point) |
| 122 | + |
| 123 | + # Create an ObjectPose using the vehicle coordinates. |
| 124 | + pose = ObjectPose(t=0, x=vehicle_point[0], y=vehicle_point[1], z=vehicle_point[2], |
| 125 | + yaw=0, pitch=0, roll=0, frame=ObjectFrameEnum.CURRENT) |
| 126 | + |
| 127 | + # Estimate the dimensions in meters by projecting the bounding box dimensions using the average depth. |
| 128 | + z = avg_cam_point[2] |
| 129 | + width_m = (bbox[2] * z) / CAMERA_INTRINSICS['fx'] |
| 130 | + height_m = (bbox[3] * z) / CAMERA_INTRINSICS['fy'] |
| 131 | + dims = (width_m, height_m, 0.5) # Assume a fixed thickness of 0.5 m for a pedestrian. |
| 132 | + |
| 133 | + return AgentState(pose=pose, dimensions=dims, outline=None, type=AgentEnum.PEDESTRIAN, |
| 134 | + activity=AgentActivityEnum.MOVING, velocity=(0, 0, 0), yaw_rate=0) |
17 | 135 |
|
18 | 136 |
|
19 | 137 | class PedestrianDetector2D(Component): |
20 | | - """Detects pedestrians.""" |
21 | | - def __init__(self,vehicle_interface : GEMInterface): |
| 138 | + """Detects pedestrians by fusing 2D image detections with lidar point cloud data.""" |
| 139 | + def __init__(self, vehicle_interface: GEMInterface): |
22 | 140 | self.vehicle_interface = vehicle_interface |
23 | | - # self.detector = YOLO('../../knowledge/detection/yolov11n.pt') |
24 | | - self.last_person_boxes = [] |
| 141 | + self.detector = None # YOLO detector for image processing |
| 142 | + self.last_person_boxes = [] # 2D bounding boxes from the image |
| 143 | + self.latest_point_cloud = None # Latest lidar point cloud (in the lidar frame) |
25 | 144 |
|
26 | 145 | def rate(self): |
27 | 146 | return 4.0 |
28 | | - |
| 147 | + |
29 | 148 | def state_inputs(self): |
30 | 149 | return ['vehicle'] |
31 | | - |
| 150 | + |
32 | 151 | def state_outputs(self): |
33 | 152 | return ['agents'] |
34 | | - |
| 153 | + |
35 | 154 | def initialize(self): |
36 | | - #tell the vehicle to use image_callback whenever 'front_camera' gets a reading, and it expects images of type cv2.Mat |
| 155 | + # Subscribe to the front camera for 2D detection. |
37 | 156 | self.detector = YOLO('../../knowledge/detection/yolov11n.pt') |
38 | 157 | self.vehicle_interface.subscribe_sensor('front_camera', self.image_callback, cv2.Mat) |
| 158 | + # Subscribe to the lidar sensor (e.g., "top_lidar"). Assume it provides a NumPy array for the point cloud. |
| 159 | + self.vehicle_interface.subscribe_sensor('top_lidar', self.lidar_callback, np.ndarray) |
39 | 160 |
|
40 | 161 | def image_callback(self, image: cv2.Mat): |
41 | | - """Image processing callback with original detection logic""" |
| 162 | + """Process the image to detect pedestrians using YOLO.""" |
42 | 163 | results = self.detector(image, conf=0.5) |
43 | 164 | boxes = results[0].boxes |
44 | | - |
45 | 165 | if len(boxes) == 0: |
46 | 166 | self.last_person_boxes = [] |
47 | 167 | return |
48 | 168 | cls_ids = boxes.cls.cpu() |
49 | 169 | xywh = boxes.xywh.cpu() |
50 | 170 | person_mask = (cls_ids == 0) |
51 | | - |
52 | 171 | self.last_person_boxes = [tuple(map(float, box)) for box in xywh[person_mask]] |
53 | | - |
54 | | - def update(self, vehicle : VehicleState) -> Dict[str,AgentState]: |
| 172 | + |
| 173 | + def lidar_callback(self, point_cloud): |
| 174 | + """Receive and store the latest lidar point cloud (assumed to be a NumPy array of shape (N,3)).""" |
| 175 | + self.latest_point_cloud = point_cloud |
| 176 | + |
| 177 | + def update(self, vehicle: VehicleState) -> Dict[str, AgentState]: |
55 | 178 | res = {} |
56 | | - for i,b in enumerate(self.last_person_boxes): |
57 | | - x,y,w,h = b |
58 | | - res['pedestrian'+str(i)] = box_to_fake_agent(b) |
| 179 | + for i, bbox in enumerate(self.last_person_boxes): |
| 180 | + # Fuse the 2D detection with lidar data to estimate a 3D location. |
| 181 | + agent_state = box_to_agent_with_lidar(bbox, self.latest_point_cloud) |
| 182 | + res['pedestrian' + str(i)] = agent_state |
59 | 183 | if len(res) > 0: |
60 | | - print("Detected",len(res),"pedestrians") |
| 184 | + print("Detected", len(res), "pedestrians") |
61 | 185 | return res |
62 | 186 |
|
63 | 187 |
|
64 | 188 | class FakePedestrianDetector2D(Component): |
65 | | - """Triggers a pedestrian detection at some random time ranges""" |
66 | | - def __init__(self,vehicle_interface : GEMInterface): |
| 189 | + """Triggers a pedestrian detection at some random time ranges.""" |
| 190 | + def __init__(self, vehicle_interface: GEMInterface): |
67 | 191 | self.vehicle_interface = vehicle_interface |
68 | | - self.times = [(5.0,20.0),(30.0,35.0)] |
| 192 | + self.times = [(5.0, 20.0), (30.0, 35.0)] |
69 | 193 | self.t_start = None |
70 | 194 |
|
71 | 195 | def rate(self): |
72 | 196 | return 4.0 |
73 | | - |
| 197 | + |
74 | 198 | def state_inputs(self): |
75 | 199 | return ['vehicle'] |
76 | | - |
| 200 | + |
77 | 201 | def state_outputs(self): |
78 | 202 | return ['agents'] |
79 | | - |
80 | | - def update(self, vehicle : VehicleState) -> Dict[str,AgentState]: |
| 203 | + |
| 204 | + def update(self, vehicle: VehicleState) -> Dict[str, AgentState]: |
81 | 205 | if self.t_start is None: |
82 | 206 | self.t_start = self.vehicle_interface.time() |
83 | 207 | t = self.vehicle_interface.time() - self.t_start |
84 | 208 | res = {} |
85 | 209 | for times in self.times: |
86 | 210 | if t >= times[0] and t <= times[1]: |
87 | | - res['pedestrian0'] = box_to_fake_agent((0,0,0,0)) |
| 211 | + res['pedestrian0'] = box_to_fake_agent((0, 0, 0, 0)) |
88 | 212 | print("Detected a pedestrian") |
89 | 213 | return res |
0 commit comments