diff --git a/GEMstack/onboard/perception/cone_detection.py b/GEMstack/onboard/perception/cone_detection.py new file mode 100644 index 000000000..3aebb199b --- /dev/null +++ b/GEMstack/onboard/perception/cone_detection.py @@ -0,0 +1,563 @@ +from ...state import AllState, VehicleState, ObjectPose, ObjectFrameEnum, AgentState, AgentEnum, AgentActivityEnum +from ..interface.gem import GEMInterface +from ..component import Component +from ultralytics import YOLO +import cv2 +from typing import Dict +import open3d as o3d +import numpy as np +from sklearn.cluster import DBSCAN +from scipy.spatial.transform import Rotation as R +import rospy +from sensor_msgs.msg import PointCloud2, Image +import sensor_msgs.point_cloud2 as pc2 +import struct, ctypes +from message_filters import Subscriber, ApproximateTimeSynchronizer +from cv_bridge import CvBridge +import time +import math +import ros_numpy + + +# ----- Helper Functions ----- + +def undistort_image(image, K, D): + """ + 对图像进行畸变校正,返回校正后的图像及新的内参矩阵。 + 其中 K 为原始内参,D 为畸变参数。 + """ + h, w = image.shape[:2] + newK, _ = cv2.getOptimalNewCameraMatrix(K, D, (w, h), 1, (w, h)) + undistorted = cv2.undistort(image, K, D, None, newK) + return undistorted, newK + +def filter_points_within_threshold(points, threshold=15.0): + distances = np.linalg.norm(points, axis=1) + mask = distances <= threshold + return points[mask] + + +def match_existing_cone( + new_center: np.ndarray, + new_dims: tuple, + existing_agents: Dict[str, AgentState], + distance_threshold: float = 1.0 +) -> str: + """ + Find the closest existing Cone agent within a specified distance threshold. + """ + best_agent_id = None + best_dist = float('inf') + for agent_id, agent_state in existing_agents.items(): + old_center = np.array([agent_state.pose.x, agent_state.pose.y, agent_state.pose.z]) + dist = np.linalg.norm(new_center - old_center) + if dist < distance_threshold and dist < best_dist: + best_dist = dist + best_agent_id = agent_id + return best_agent_id + + +def compute_velocity(old_pose: ObjectPose, new_pose: ObjectPose, dt: float) -> tuple: + """ + Compute the (vx, vy, vz) velocity based on change in pose over time. + """ + if dt <= 0: + return (0, 0, 0) + vx = (new_pose.x - old_pose.x) / dt + vy = (new_pose.y - old_pose.y) / dt + vz = (new_pose.z - old_pose.z) / dt + return (vx, vy, vz) + + +def extract_roi_box(lidar_pc, center, half_extents): + """ + Extract a region of interest (ROI) from the LiDAR point cloud defined by an axis-aligned bounding box. + """ + lower = center - half_extents + upper = center + half_extents + mask = np.all((lidar_pc >= lower) & (lidar_pc <= upper), axis=1) + return lidar_pc[mask] + + +def pc2_to_numpy(pc2_msg, want_rgb=False): + """ + Convert a ROS PointCloud2 message into a numpy array quickly using ros_numpy. + This function extracts the x, y, z coordinates from the point cloud. + """ + # Convert the ROS message to a numpy structured array + pc = ros_numpy.point_cloud2.pointcloud2_to_array(pc2_msg) + # Stack x,y,z fields to a (N,3) array + pts = np.stack((np.array(pc['x']).ravel(), + np.array(pc['y']).ravel(), + np.array(pc['z']).ravel()), axis=1) + # Apply filtering (for example, x > 0 and z in a specified range) + mask = (pts[:, 0] > 0) & (pts[:, 2] < -1.5) & (pts[:, 2] > -2.7) + return pts[mask] + + +def backproject_pixel(u, v, K): + """ + Backprojects a pixel coordinate (u, v) into a normalized 3D ray in the camera coordinate system. + """ + cx, cy = K[0, 2], K[1, 2] + fx, fy = K[0, 0], K[1, 1] + x = (u - cx) / fx + y = (v - cy) / fy + ray_dir = np.array([x, y, 1.0]) + return ray_dir / np.linalg.norm(ray_dir) + + +def find_human_center_on_ray(lidar_pc, ray_origin, ray_direction, + t_min, t_max, t_step, + distance_threshold, min_points, ransac_threshold): + """ + Identify the center of a human along a projected ray. + (This function is no longer used in the new approach.) + """ + return None, None, None + + +def extract_roi(pc, center, roi_radius): + """ + Extract points from a point cloud that lie within a specified radius of a center point. + """ + distances = np.linalg.norm(pc - center, axis=1) + return pc[distances < roi_radius] + + +def refine_cluster(roi_points, center, eps=0.2, min_samples=10): + """ + Refine a point cluster by applying DBSCAN and return the cluster closest to 'center'. + """ + if roi_points.shape[0] < min_samples: + return roi_points + clustering = DBSCAN(eps=eps, min_samples=min_samples).fit(roi_points) + labels = clustering.labels_ + valid_clusters = [roi_points[labels == l] for l in set(labels) if l != -1] + if not valid_clusters: + return roi_points + best_cluster = min(valid_clusters, key=lambda c: np.linalg.norm(np.mean(c, axis=0) - center)) + return best_cluster + + +def remove_ground_by_min_range(cluster, z_range=0.05): + """ + Remove points within z_range of the minimum z (assumed to be ground). + """ + if cluster is None or cluster.shape[0] == 0: + return cluster + min_z = np.min(cluster[:, 2]) + filtered = cluster[cluster[:, 2] > (min_z + z_range)] + return filtered + + +def get_bounding_box_center_and_dimensions(points): + """ + Calculate the axis-aligned bounding box's center and dimensions for a set of 3D points. + """ + if points.shape[0] == 0: + return None, None + min_vals = np.min(points, axis=0) + max_vals = np.max(points, axis=0) + center = (min_vals + max_vals) / 2 + dimensions = max_vals - min_vals + return center, dimensions + + +def create_ray_line_set(start, end): + """ + Create an Open3D LineSet object representing a ray between two 3D points. + The line is colored yellow. + """ + points = [start, end] + lines = [[0, 1]] + line_set = o3d.geometry.LineSet() + line_set.points = o3d.utility.Vector3dVector(points) + line_set.lines = o3d.utility.Vector2iVector(lines) + line_set.colors = o3d.utility.Vector3dVector([[1, 1, 0]]) + return line_set + + +def downsample_points(lidar_points, voxel_size=0.15): + pcd = o3d.geometry.PointCloud() + pcd.points = o3d.utility.Vector3dVector(lidar_points) + down_pcd = pcd.voxel_down_sample(voxel_size=voxel_size) + return np.asarray(down_pcd.points) + + +def filter_depth_points(lidar_points, max_depth_diff=0.9, use_norm=True): + if lidar_points.shape[0] == 0: + return lidar_points + + if use_norm: + depths = np.linalg.norm(lidar_points, axis=1) + else: + depths = lidar_points[:, 0] + + min_depth = np.min(depths) + max_possible_depth = min_depth + max_depth_diff + mask = depths < max_possible_depth + return lidar_points[mask] + + +def visualize_geometries(geometries, window_name="Open3D", width=800, height=600, point_size=5.0): + """ + Visualize a list of Open3D geometry objects in a dedicated window. + """ + vis = o3d.visualization.Visualizer() + vis.create_window(window_name=window_name, width=width, height=height) + for geom in geometries: + vis.add_geometry(geom) + opt = vis.get_render_option() + opt.point_size = point_size + vis.run() + vis.destroy_window() + + +def pose_to_matrix(pose): + """ + Compose a 4x4 transformation matrix from a pose state. + Assumes pose has attributes: x, y, z, yaw, pitch, roll, + where the angles are given in degrees. + """ + x = pose.x if pose.x is not None else 0.0 + y = pose.y if pose.y is not None else 0.0 + z = pose.z if pose.z is not None else 0.0 + if pose.yaw is not None and pose.pitch is not None and pose.roll is not None: + yaw = math.radians(pose.yaw) + pitch = math.radians(pose.pitch) + roll = math.radians(pose.roll) + else: + yaw = 0.0 + pitch = 0.0 + roll = 0.0 + R_mat = R.from_euler('zyx', [yaw, pitch, roll]).as_matrix() + T = np.eye(4) + T[:3, :3] = R_mat + T[:3, 3] = np.array([x, y, z]) + return T + + +def transform_points_l2c(lidar_points, T_l2c): + N = lidar_points.shape[0] + pts_hom = np.hstack((lidar_points, np.ones((N, 1)))) # (N,4) + pts_cam = (T_l2c @ pts_hom.T).T # (N,4) + return pts_cam[:, :3] + + +# ----- New: Vectorized projection function ----- +def project_points(pts_cam, K, original_lidar_points): + """ + Vectorized version. + pts_cam: (N,3) array of points in camera coordinates. + original_lidar_points: (N,3) array of points in LiDAR coordinates. + Returns a (M,5) array: [u, v, X_lidar, Y_lidar, Z_lidar] for all points with Z>0. + """ + mask = pts_cam[:, 2] > 0 + pts_cam_valid = pts_cam[mask] + lidar_valid = original_lidar_points[mask] + Xc = pts_cam_valid[:, 0] + Yc = pts_cam_valid[:, 1] + Zc = pts_cam_valid[:, 2] + u = (K[0, 0] * (Xc / Zc) + K[0, 2]).astype(np.int32) + v = (K[1, 1] * (Yc / Zc) + K[1, 2]).astype(np.int32) + proj = np.column_stack((u, v, lidar_valid)) + return proj + + +class ConeDetector3D(Component): + """ + Detects cones by fusing YOLO 2D detections with LiDAR point cloud data. + + New approach: + 1) Downsample the LiDAR point cloud. + 2) Transform it to the camera coordinate system. + 3) Project all points onto the image plane. + 4) Filter the projected points using the YOLO 2D bounding boxes. + 5) Apply DBSCAN clustering and remove ground points. + 6) Compute an oriented bounding box and transform to the Vehicle frame. + 7) (Optional) Reproject the refined cluster onto the image. + + (Note: Do not add any additional visualization beyond what is necessary.) + """ + + def __init__(self, vehicle_interface: GEMInterface): + self.vehicle_interface = vehicle_interface + self.current_agents = {} + self.tracked_agents = {} + self.cone_counter = 0 + self.latest_image = None + self.latest_lidar = None + self.bridge = CvBridge() + self.start_pose_abs = None + self.camera_front = True + + def rate(self) -> float: + return 4.0 + + def state_inputs(self) -> list: + return ['vehicle'] + + def state_outputs(self) -> list: + return ['agents'] + + def initialize(self): + self.rgb_sub = Subscriber('/oak/rgb/image_raw', Image) + self.lidar_sub = Subscriber('/ouster/points', PointCloud2) + self.sync = ApproximateTimeSynchronizer([self.rgb_sub, self.lidar_sub], + queue_size=10, slop=0.1) + self.sync.registerCallback(self.synchronized_callback) + self.detector = YOLO('../../knowledge/detection/cone.pt') + self.detector.to('cuda') + + if self.camera_front: + self.K = np.array([[684.83331299, 0., 573.37109375], + [0., 684.60968018, 363.70092773], + [0., 0., 1.]]) + else: + self.K = np.array([[1230.144096, 0., 978.828508], + [0., 1230.630424, 605.794034], + [0., 0., 1.]]) + + if self.camera_front: + self.D = np.array([0.0, 0.0, 0.0, 0.0, 0.0]) + else: + self.D = [-0.23751890570984993, 0.08452214195986749, -0.00035324203850054794, -0.0003762498910536819, 0.0] + + self.T_l2v = np.array([[0.99939639, 0.02547917, 0.023615, 1.1], + [-0.02530848, 0.99965156, -0.00749882, 0.03773583], + [-0.02379784, 0.00689664, 0.999693, 1.95320223], + [0., 0., 0., 1.]]) + if self.camera_front: + self.T_l2c = np.array([ + [ 2.89748006e-02, -9.99580136e-01, 3.68439439e-05, -3.07300513e-02], + [-9.49930618e-03, -3.12215512e-04, -9.99954834e-01, -3.86689354e-01], + [ 9.99534999e-01, 2.89731321e-02, -9.50437214e-03, -6.71425124e-01], + [ 0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 1.00000000e+00] + ]) + else: + self.T_l2c = np.array([[ 0.71082304, -0.70305212, -0.02608284, 0.17771596], + [-0.13651802, -0.10076507, -0.98505595, -0.56321222], + [ 0.68915595, 0.70388118, -0.1678969 , -0.62027912], + [ 0. , 0. , 0. , 1. ]]) + self.T_c2l = np.linalg.inv(self.T_l2c) + self.R_c2l = self.T_c2l[:3, :3] + self.camera_origin_in_lidar = self.T_c2l[:3, 3] + + def synchronized_callback(self, image_msg, lidar_msg): + step1 = time.time() + try: + self.latest_image = self.bridge.imgmsg_to_cv2(image_msg, "bgr8") + except Exception as e: + rospy.logerr("Failed to convert image: {}".format(e)) + self.latest_image = None + step2 = time.time() + self.latest_lidar = pc2_to_numpy(lidar_msg, want_rgb=False) + step3 = time.time() + print('image callback: ', step2 - step1, 'lidar callback ', step3 - step2) + + def update(self, vehicle: VehicleState) -> Dict[str, AgentState]: + downsample = False + if self.latest_image is None or self.latest_lidar is None: + return {} + + current_time = self.vehicle_interface.time() + + undistorted_img, current_K = undistort_image(self.latest_image, self.K, self.D) + self.current_K = current_K + self.latest_image = undistorted_img + + # Run YOLO to obtain 2D detections (class 0: cones) + results = self.detector(self.latest_image, conf=0.3, classes=[0]) + boxes = np.array(results[0].boxes.xywh.cpu()) + agents = {} + + if downsample == True: + lidar_down = downsample_points(self.latest_lidar, voxel_size=0.1) + pts_cam = transform_points_l2c(lidar_down, self.T_l2c) + projected_pts = project_points(pts_cam, self.current_K, lidar_down) + else: + step00 = time.time() + lidar_down = self.latest_lidar.copy() + step01 = time.time() + pts_cam = transform_points_l2c(lidar_down, self.T_l2c) + step02 = time.time() + projected_pts = project_points(pts_cam, self.current_K, lidar_down) # shape (N,5): [u, v, X, Y, Z] + step03 = time.time() + print( + f'copy lidar data {step01 - step00}s, transforming to camera {step02 - step01}s, projecting to image {step03 - step02}s') + + # For each 2D bounding box, filter projected points instead of ray-casting + for i, box in enumerate(boxes): + start = time.time() + cx, cy, w, h = box + left = int(cx - w / 1.8) + right = int(cx + w / 1.8) + top = int(cy - h / 1.8) + bottom = int(cy + h / 1.8) + mask = (projected_pts[:, 0] >= left) & (projected_pts[:, 0] <= right) & \ + (projected_pts[:, 1] >= top) & (projected_pts[:, 1] <= bottom) + roi_pts = projected_pts[mask] + if roi_pts.shape[0] < 5: + continue + # Extract the LiDAR 3D points corresponding to the ROI + points_3d = roi_pts[:, 2:5] + points_3d = remove_ground_by_min_range(points_3d, z_range=0.005) + points_3d = filter_points_within_threshold(points_3d, 15) + points_3d = filter_depth_points(points_3d, max_human_depth=0.3) + + # Cluster the points and remove ground + refined_cluster = refine_cluster(points_3d, np.mean(points_3d, axis=0), eps=0.5, min_samples=10) + refined_cluster = remove_ground_by_min_range(refined_cluster, z_range=0.01) + end1 = time.time() + print('refine cluster: ', end1 - start) + if refined_cluster.shape[0] < 3: + continue + + # Compute the oriented bounding box + pcd = o3d.geometry.PointCloud() + pcd.points = o3d.utility.Vector3dVector(refined_cluster) + obb = pcd.get_oriented_bounding_box() + refined_center = obb.center + dims = tuple(obb.extent) + R_lidar = obb.R.copy() + end2 = time.time() + print('compute bounding box ', end2 - end1) + # Transform the refined center from LiDAR to Vehicle frame + refined_center_hom = np.append(refined_center, 1) + refined_center_vehicle_hom = self.T_l2v @ refined_center_hom + refined_center_vehicle = refined_center_vehicle_hom[:3] + + R_vehicle = self.T_l2v[:3, :3] @ R_lidar + euler_vehicle = R.from_matrix(R_vehicle).as_euler('zyx', degrees=False) + yaw, pitch, roll = euler_vehicle + refined_center = refined_center_vehicle + + if self.start_pose_abs is None: + self.start_pose_abs = vehicle.pose + + vehicle_start_pose = vehicle.pose.to_frame(ObjectFrameEnum.START, vehicle.pose, self.start_pose_abs) + T_vehicle_to_start = pose_to_matrix(vehicle_start_pose) + refined_center_hom_vehicle = np.append(refined_center, 1) + refined_center_start = (T_vehicle_to_start @ refined_center_hom_vehicle)[:3] + + new_pose = ObjectPose( + t=current_time, + x=refined_center_start[0], + y=refined_center_start[1], + z=refined_center_start[2], + yaw=yaw, + pitch=pitch, + roll=roll, + frame=ObjectFrameEnum.START + ) + + existing_id = match_existing_cone( + new_center=np.array([new_pose.x, new_pose.y, new_pose.z]), + new_dims=dims, + existing_agents=self.tracked_agents, + distance_threshold=2.0 + ) + if existing_id is not None: + old_state = self.tracked_agents[existing_id] + if vehicle.v < 0.1: + alpha = 0.1 + avg_x = alpha * new_pose.x + (1 - alpha) * old_state.pose.x + avg_y = alpha * new_pose.y + (1 - alpha) * old_state.pose.y + avg_z = alpha * new_pose.z + (1 - alpha) * old_state.pose.z + avg_yaw = alpha * new_pose.yaw + (1 - alpha) * old_state.pose.yaw + avg_pitch = alpha * new_pose.pitch + (1 - alpha) * old_state.pose.pitch + avg_roll = alpha * new_pose.roll + (1 - alpha) * old_state.pose.roll + + updated_pose = ObjectPose( + t=new_pose.t, + x=avg_x, + y=avg_y, + z=avg_z, + yaw=avg_yaw, + pitch=avg_pitch, + roll=avg_roll, + frame=new_pose.frame + ) + updated_agent = AgentState( + pose=updated_pose, + dimensions=dims, + outline=None, + type=AgentEnum.CONE, + activity=AgentActivityEnum.MOVING, + velocity=(0, 0, 0), + yaw_rate=0 + ) + else: + updated_agent = old_state + agents[existing_id] = updated_agent + self.tracked_agents[existing_id] = updated_agent + else: + agent_id = f"Cone{self.cone_counter}" + self.cone_counter += 1 + new_agent = AgentState( + pose=new_pose, + dimensions=dims, + outline=None, + type=AgentEnum.CONE, + activity=AgentActivityEnum.MOVING, + velocity=(0, 0, 0), + yaw_rate=0 + ) + agents[agent_id] = new_agent + self.tracked_agents[agent_id] = new_agent + + self.current_agents = agents + + stale_ids = [agent_id for agent_id, agent in self.tracked_agents.items() + if current_time - agent.pose.t > 5.0] + for agent_id in stale_ids: + rospy.loginfo(f"Removing stale agent: {agent_id}\n") + for agent_id, agent in agents.items(): + p = agent.pose + rospy.loginfo( + f"Agent ID: {agent_id}\n" + f"Pose: (x: {p.x:.3f}, y: {p.y:.3f}, z: {p.z:.3f}, " + f"yaw: {p.yaw:.3f}, pitch: {p.pitch:.3f}, roll: {p.roll:.3f})\n" + f"Velocity: (vx: {agent.velocity[0]:.3f}, vy: {agent.velocity[1]:.3f}, vz: {agent.velocity[2]:.3f})\n" + ) + return agents + + # ----- Fake Cone Detector 2D (for Testing Purposes) ----- + +class FakConeDetector(Component): + def __init__(self, vehicle_interface: GEMInterface): + self.vehicle_interface = vehicle_interface + self.times = [(5.0, 20.0), (30.0, 35.0)] + self.t_start = None + + def rate(self): + return 4.0 + + def state_inputs(self): + return ['vehicle'] + + def state_outputs(self): + return ['agents'] + + def update(self, vehicle: VehicleState) -> Dict[str, AgentState]: + if self.t_start is None: + self.t_start = self.vehicle_interface.time() + t = self.vehicle_interface.time() - self.t_start + res = {} + for time_range in self.times: + if t >= time_range[0] and t <= time_range[1]: + res['cone0'] = box_to_fake_agent((0, 0, 0, 0)) + rospy.loginfo("Detected a Cone (simulated)") + return res + +def box_to_fake_agent(box): + x, y, w, h = box + pose = ObjectPose(t=0, x=x + w / 2, y=y + h / 2, z=0, yaw=0, pitch=0, roll=0, frame=ObjectFrameEnum.CURRENT) + dims = (w, h, 0) + return AgentState(pose=pose, dimensions=dims, outline=None, + type=AgentEnum.CONE, activity=AgentActivityEnum.MOVING, + velocity=(0, 0, 0), yaw_rate=0) + +if __name__ == '__main__': + pass diff --git a/GEMstack/onboard/perception/pedestrian_detection.py b/GEMstack/onboard/perception/pedestrian_detection.py index be4001e76..4efe85bca 100644 --- a/GEMstack/onboard/perception/pedestrian_detection.py +++ b/GEMstack/onboard/perception/pedestrian_detection.py @@ -15,191 +15,8 @@ from message_filters import Subscriber, ApproximateTimeSynchronizer from cv_bridge import CvBridge import time - -''' -The following function is contributed by Aavi Deora -Essentially this method is simliar to teamB's method, i.e. transform point cloud from lidar to image frame -Though we find this calculation is heavy, this method have potential usage in other parts, e.g. replacing DBSCAN -''' - -# def get_average_cam_point_from_lidar(bbox, point_cloud): -# """ -# Given a bounding box (x, y, w, h) in image coordinates and a lidar point cloud (in the lidar frame), -# this function: -# 1. Converts the point cloud to homogeneous coordinates. -# 2. Transforms the points into the camera frame using LIDAR_TO_CAMERA_TRANSFORM. -# 3. Projects the points into the image using the camera intrinsics. -# 4. Selects points that fall inside the bounding box. -# 5. Returns the average 3D point (in the camera frame) of those points. -# -# Returns: -# avg_point: A NumPy array [x_cam, y_cam, z_cam] in the camera frame, -# or None if no points are found inside the bbox. -# """ -# x, y, w, h = bbox -# if point_cloud is None or point_cloud.shape[0] == 0: -# return None -# -# # Convert the point cloud (assumed shape (N,3)) to homogeneous coordinates (N,4) -# ones = np.ones((point_cloud.shape[0], 1)) -# points_homog = np.hstack((point_cloud, ones)) # shape (N,4) -# -# # Transform points from lidar frame to camera frame -# cam_points_homog = (LIDAR_TO_CAMERA_TRANSFORM @ points_homog.T).T # shape (N,4) -# cam_points = cam_points_homog[:, :3] # (x_cam, y_cam, z_cam) -# -# # Only consider points in front of the camera (z_cam > 0) -# valid_mask = cam_points[:, 2] > 0 -# cam_points = cam_points[valid_mask] -# if cam_points.shape[0] == 0: -# return None -# -# # Project points into the image using the pinhole camera model: -# # u = fx * (x_cam / z_cam) + cx, v = fy * (y_cam / z_cam) + cy -# u = CAMERA_INTRINSICS['fx'] * (cam_points[:, 0] / cam_points[:, 2]) + CAMERA_INTRINSICS['cx'] -# v = CAMERA_INTRINSICS['fy'] * (cam_points[:, 1] / cam_points[:, 2]) + CAMERA_INTRINSICS['cy'] -# -# # Create a mask for points that fall within the bounding box. -# in_bbox_mask = (u >= x) & (u <= x + w) & (v >= y) & (v <= y + h) -# if np.sum(in_bbox_mask) == 0: -# return None -# -# selected_points = cam_points[in_bbox_mask] -# return selected_points - - -''' -The following function is contributed by Shuning Liu. -Since the height and velocity detection is not entirely stable, we are not using this matching logic currently. -But we may also test a more robust version of this in future on board test. -''' - -# def match_existing_pedestrian( -# new_center: np.ndarray, -# new_dims: tuple, -# existing_agents: Dict[str, AgentState], -# prev_velocities: Dict[str, np.ndarray], -# distance_threshold: float = 1.0, -# size_threshold: float = 0.5, -# height_threshold: float = 0.3, -# velocity_threshold: float = 2.0 -# ) -> str: -# """ -# Match a newly detected pedestrian with an existing one using: -# - Euclidean distance -# - Bounding box similarity -# - Height consistency -# - Velocity consistency -# -# Returns the best-matching agent_id or None if no good match is found. -# """ -# best_agent_id = None -# best_score = float('inf') -# -# for agent_id, agent_state in existing_agents.items(): -# old_center = np.array([agent_state.pose.x, agent_state.pose.y, agent_state.pose.z]) -# old_dims = agent_state.dimensions -# -# # 1. Euclidean Distance Check -# dist = np.linalg.norm(new_center - old_center) -# if dist > distance_threshold: -# continue # Skip if too far away -# -# # 2. Bounding Box Size Similarity (with Zero-Division Handling) -# size_norm = np.linalg.norm(np.array(old_dims)) -# if size_norm > 0: -# size_change = np.linalg.norm(np.array(new_dims) - np.array(old_dims)) / size_norm -# else: -# size_change = float('inf') # Prevent invalid matching -# -# if size_change > size_threshold: -# continue # Skip if bounding box changes too much -# -# # 3. Height Consistency Check -# height_change = abs(new_dims[2] - old_dims[2]) / old_dims[2] if old_dims[2] > 0 else 0 -# if height_change > height_threshold: -# continue # Skip if height changes drastically -# -# # 4. Velocity Consistency Check -# if agent_id in prev_velocities: -# prev_velocity = prev_velocities[agent_id] -# estimated_velocity = (new_center - old_center) -# velocity_change = np.linalg.norm(estimated_velocity - prev_velocity) -# -# if velocity_change > velocity_threshold: -# continue # Skip if unrealistic velocity jump -# -# # Score: Lower score = better match (distance is primary factor) -# score = dist -# if score < best_score: -# best_score = score -# best_agent_id = agent_id -# -# return best_agent_id - -''' -The following function is contributed by Justin Li. An alternative to basic distance-based matching -We plan to test this new tracking logic in the next on board test -''' - -class BoundingBox: - """ - center: center of bounding box in x, y, z coordinates - dimensions: dimensions of bounding box in x, y, z format - orientation: 3x3 rotation matrix - """ - - def __init__( - self, - center: tuple[float, float, float], - dimensions: tuple[float, float, float], - orientation: list[list[float]], - ): - self.center = np.array(center) - self.dimensions = np.array(dimensions) - self.orientation = np.array(orientation) - -def obb_collision(box1: BoundingBox, box2: BoundingBox): - """ - Checks if two oriented bounding boxes (OBBs) collide using the Separating Axis Theorem (SAT). - - :param box1: Box with 'center' (x, y, z), 'dimensions' (dx, dy, dz), and 'orientation' (3x3 rotation matrix) - :param box2: Box with 'center' (x, y, z), 'dimensions' (dx, dy, dz), and 'orientation' (3x3 rotation matrix) - :return: Boolean indicating whether the two OBBs collide - """ - - def get_axes(rotation_matrix): - """Extracts the axes from the rotation matrix.""" - return [rotation_matrix[:, i] for i in range(3)] - - def project_obb(obb: BoundingBox, axis): - """Projects the OBB onto a given axis and returns the min and max values.""" - center_proj = np.dot(obb.center, axis) - extents = np.abs(np.dot(obb.orientation, np.diag(obb.dimensions / 2.0))) - radius = np.sum(extents * np.abs(axis)) - return center_proj - radius, center_proj + radius - - def overlap_on_axis(axis, box1, box2): - """Checks if the projections of two OBBs overlap on the given axis.""" - min1, max1 = project_obb(box1, axis) - min2, max2 = project_obb(box2, axis) - return max1 >= min2 and max2 >= min1 - - # Get OBB axes - axes1 = get_axes(box1.orientation) - axes2 = get_axes(box2.orientation) - - # Compute cross products of axes for possible separating axes - cross_axes = [np.cross(a1, a2) for a1 in axes1 for a2 in axes2] - - # Test all axes - for axis in axes1 + axes2 + cross_axes: - if np.linalg.norm(axis) > 1e-6: # Avoid near-zero axes - axis = axis / np.linalg.norm(axis) # Normalize - if not overlap_on_axis(axis, box1, box2): - return False # Separating axis found, no collision - - return True # No separating axis found, collision detected +import math +import ros_numpy # ----- Helper Functions ----- @@ -212,40 +29,21 @@ def match_existing_pedestrian( ) -> str: """ Find the closest existing pedestrian agent within a specified distance threshold. - - Args: - new_center (np.ndarray): The 3D center (x,y,z) of the new detection. - new_dims (tuple): The dimensions of the new detection (currently unused). - existing_agents (Dict[str, AgentState]): Dictionary of currently tracked agents. - distance_threshold (float): Maximum allowed distance to consider a match. - - Returns: - str: The agent_id of the best matching pedestrian if within the threshold; otherwise, None. """ best_agent_id = None best_dist = float('inf') - for agent_id, agent_state in existing_agents.items(): old_center = np.array([agent_state.pose.x, agent_state.pose.y, agent_state.pose.z]) dist = np.linalg.norm(new_center - old_center) if dist < distance_threshold and dist < best_dist: best_dist = dist best_agent_id = agent_id - return best_agent_id def compute_velocity(old_pose: ObjectPose, new_pose: ObjectPose, dt: float) -> tuple: """ - Compute the velocity vector (vx, vy, vz) based on change in pose over a time interval. - - Args: - old_pose (ObjectPose): The previous pose. - new_pose (ObjectPose): The current pose. - dt (float): Time elapsed between the poses. - - Returns: - tuple: (vx, vy, vz) velocity in the same coordinate frame as the input poses. + Compute the (vx, vy, vz) velocity based on change in pose over time. """ if dt <= 0: return (0, 0, 0) @@ -258,14 +56,6 @@ def compute_velocity(old_pose: ObjectPose, new_pose: ObjectPose, dt: float) -> t def extract_roi_box(lidar_pc, center, half_extents): """ Extract a region of interest (ROI) from the LiDAR point cloud defined by an axis-aligned bounding box. - - Args: - lidar_pc (np.ndarray): The LiDAR point cloud as an (N, 3) array. - center (np.ndarray): Center of the ROI box. - half_extents (np.ndarray): Half-lengths in each dimension defining the box. - - Returns: - np.ndarray: Subset of points within the ROI. """ lower = center - half_extents upper = center + half_extents @@ -273,38 +63,43 @@ def extract_roi_box(lidar_pc, center, half_extents): return lidar_pc[mask] +# def pc2_to_numpy(pc2_msg, want_rgb=False): +# """ +# Convert a ROS PointCloud2 message into a numpy array. +# This function extracts the x, y, z coordinates from the point cloud. +# """ +# start = time.time() +# gen = pc2.read_points(pc2_msg, skip_nans=True) +# end = time.time() +# print('Read lidar points: ', end - start) +# start = time.time() +# pts = np.array(list(gen), dtype=np.float16) +# pts = pts[:, :3] # Only x, y, z coordinates +# mask = (pts[:, 0] > 0) & (pts[:, 2] < 2.5) +# end = time.time() +# print('Convert to numpy: ', end - start) +# return pts[mask] + def pc2_to_numpy(pc2_msg, want_rgb=False): """ - Convert a ROS PointCloud2 message into a numpy array with basic filtering. - - This function extracts the x, y, z coordinates from the point cloud and - filters out points with x <= 0 and z >= 2.5. - - Args: - pc2_msg: The ROS PointCloud2 message. - want_rgb (bool): Flag to indicate if RGB data is desired (not used here). - - Returns: - np.ndarray: Filtered point cloud array of shape (N, 3). + Convert a ROS PointCloud2 message into a numpy array quickly using ros_numpy. + This function extracts the x, y, z coordinates from the point cloud. """ - gen = pc2.read_points(pc2_msg, skip_nans=True) - pts = np.array(list(gen), dtype=np.float32) - pts = pts[:, :3] # Only x, y, z coordinates + # Convert the ROS message to a numpy structured array + pc = ros_numpy.point_cloud2.pointcloud2_to_array(pc2_msg) + # Convert each field to a 1D array and stack along axis 1 to get (N, 3) + pts = np.stack((np.array(pc['x']).ravel(), + np.array(pc['y']).ravel(), + np.array(pc['z']).ravel()), axis=1) + # Apply filtering (for example, x > 0 and z < 2.5) mask = (pts[:, 0] > 0) & (pts[:, 2] < 2.5) return pts[mask] + def backproject_pixel(u, v, K): """ - Backproject a pixel coordinate (u, v) into a normalized 3D ray in the camera coordinate system. - - Args: - u (float): The pixel's x-coordinate. - v (float): The pixel's y-coordinate. - K (np.ndarray): The 3x3 camera intrinsic matrix. - - Returns: - np.ndarray: A unit vector representing the 3D ray direction. + Backprojects a pixel coordinate (u, v) into a normalized 3D ray in the camera coordinate system. """ cx, cy = K[0, 2], K[1, 2] fx, fy = K[0, 0], K[1, 1] @@ -318,60 +113,15 @@ def find_human_center_on_ray(lidar_pc, ray_origin, ray_direction, t_min, t_max, t_step, distance_threshold, min_points, ransac_threshold): """ - Identify the center of a human detected along a projected ray. - - This function first filters the LiDAR points to only those near the ray. Then it - sweeps along the ray (from t_min to t_max in steps of t_step) to find a candidate - location where enough points (>= min_points) lie within the distance_threshold. - - Args: - lidar_pc (np.ndarray): LiDAR point cloud in 3D. - ray_origin (np.ndarray): Origin of the ray in LiDAR coordinates. - ray_direction (np.ndarray): Normalized direction of the ray. - t_min (float): Minimum distance along the ray to start searching. - t_max (float): Maximum distance along the ray. - t_step (float): Increment along the ray. - distance_threshold (float): Maximum distance from the ray for a point to be considered. - min_points (int): Minimum number of points needed to validate a candidate. - ransac_threshold (float): (Unused in current implementation) threshold for RANSAC. - - Returns: - tuple: (refined_candidate, None, None) if a valid candidate is found; otherwise, (None, None, None). + Identify the center of a human along a projected ray. + (This function is no longer used in the new approach.) """ - # Compute projection distances and the corresponding points on the ray. - vecs = lidar_pc - ray_origin - proj_lengths = np.dot(vecs, ray_direction) - proj_points = ray_origin + np.outer(proj_lengths, ray_direction) - dists_to_ray = np.linalg.norm(lidar_pc - proj_points, axis=1) - near_ray_mask = dists_to_ray < distance_threshold - filtered_pc = lidar_pc[near_ray_mask] - - if filtered_pc.shape[0] < min_points: - return None, None, None - - # Sweep along the ray to locate a cluster of points. - t_values = np.arange(t_min, t_max, t_step) - for t in t_values: - candidate = ray_origin + t * ray_direction - dists = np.linalg.norm(filtered_pc - candidate, axis=1) - nearby_points = filtered_pc[dists < distance_threshold] - if nearby_points.shape[0] >= min_points: - refined_candidate = np.mean(nearby_points, axis=0) - return refined_candidate, None, None return None, None, None def extract_roi(pc, center, roi_radius): """ Extract points from a point cloud that lie within a specified radius of a center point. - - Args: - pc (np.ndarray): The point cloud array. - center (np.ndarray): The 3D center point. - roi_radius (float): The radius of the region of interest. - - Returns: - np.ndarray: Points from pc that are within roi_radius of center. """ distances = np.linalg.norm(pc - center, axis=1) return pc[distances < roi_radius] @@ -379,17 +129,10 @@ def extract_roi(pc, center, roi_radius): def refine_cluster(roi_points, center, eps=0.2, min_samples=10): """ - Refine a point cluster by applying DBSCAN and selecting the cluster closest to a given center. - - Args: - roi_points (np.ndarray): Points within a region of interest. - center (np.ndarray): The expected center of the cluster. - eps (float): The maximum distance between two samples for one to be considered in the neighborhood of the other. - min_samples (int): The number of samples required in a neighborhood for a point to be considered as a core point. - - Returns: - np.ndarray: The refined cluster points. If no clusters are found, returns the original points. + Refine a point cluster by applying DBSCAN and return the cluster closest to 'center'. """ + if roi_points.shape[0] < min_samples: + return roi_points clustering = DBSCAN(eps=eps, min_samples=min_samples).fit(roi_points) labels = clustering.labels_ valid_clusters = [roi_points[labels == l] for l in set(labels) if l != -1] @@ -401,14 +144,7 @@ def refine_cluster(roi_points, center, eps=0.2, min_samples=10): def remove_ground_by_min_range(cluster, z_range=0.05): """ - Remove ground points from a cluster by excluding points that lie close to the minimum z value. - - Args: - cluster (np.ndarray): The input point cluster. - z_range (float): The threshold range above the minimum z-value to consider as ground. - - Returns: - np.ndarray: The cluster with ground points removed. + Remove points within z_range of the minimum z (assumed to be ground). """ if cluster is None or cluster.shape[0] == 0: return cluster @@ -420,13 +156,6 @@ def remove_ground_by_min_range(cluster, z_range=0.05): def get_bounding_box_center_and_dimensions(points): """ Calculate the axis-aligned bounding box's center and dimensions for a set of 3D points. - - Args: - points (np.ndarray): The input points. - - Returns: - tuple: (center, dimensions) where center is the midpoint of the bounding box and dimensions is (max - min). - Returns (None, None) if the input is empty. """ if points.shape[0] == 0: return None, None @@ -440,15 +169,7 @@ def get_bounding_box_center_and_dimensions(points): def create_ray_line_set(start, end): """ Create an Open3D LineSet object representing a ray between two 3D points. - The line is colored yellow. - - Args: - start (np.ndarray): The start point of the ray. - end (np.ndarray): The end point of the ray. - - Returns: - o3d.geometry.LineSet: The LineSet object representing the ray. """ points = [start, end] lines = [[0, 1]] @@ -458,17 +179,25 @@ def create_ray_line_set(start, end): line_set.colors = o3d.utility.Vector3dVector([[1, 1, 0]]) return line_set +def downsample_points(lidar_points, voxel_size=0.15): + pcd = o3d.geometry.PointCloud() + pcd.points = o3d.utility.Vector3dVector(lidar_points) + down_pcd = pcd.voxel_down_sample(voxel_size=voxel_size) + return np.asarray(down_pcd.points) + +def filter_depth_points(lidar_points, max_human_depth=0.9): + + if lidar_points.shape[0] == 0: + return lidar_points + lidar_points_dist = lidar_points[:, 0] + min_dist = np.min(lidar_points_dist) + max_possible_dist = min_dist + max_human_depth + filtered_array = lidar_points[lidar_points_dist < max_possible_dist] + return filtered_array def visualize_geometries(geometries, window_name="Open3D", width=800, height=600, point_size=5.0): """ Visualize a list of Open3D geometry objects in a dedicated window. - - Args: - geometries (list): A list of Open3D geometry objects. - window_name (str): Title of the visualization window. - width (int): Width of the window. - height (int): Height of the window. - point_size (float): Size of the rendered points. """ vis = o3d.visualization.Visualizer() vis.create_window(window_name=window_name, width=width, height=height) @@ -479,15 +208,73 @@ def visualize_geometries(geometries, window_name="Open3D", width=800, height=600 vis.run() vis.destroy_window() - -# ----- Pedestrian Detector 2D (with 3D Fusion and Synchronized Callbacks) ----- +def pose_to_matrix(pose): + """ + Compose a 4x4 transformation matrix from a pose state. + Assumes pose has attributes: x, y, z, yaw, pitch, roll, + where the angles are given in degrees. + """ + # Use default values if any are None (e.g. if the car is not moving) + x = pose.x if pose.x is not None else 0.0 + y = pose.y if pose.y is not None else 0.0 + z = pose.z if pose.z is not None else 0.0 + if pose.yaw is not None and pose.pitch is not None and pose.roll is not None: + yaw = math.radians(pose.yaw) + pitch = math.radians(pose.pitch) + roll = math.radians(pose.roll) + else: + yaw = 0.0 + pitch = 0.0 + roll = 0.0 + R_mat = R.from_euler('zyx', [yaw, pitch, roll]).as_matrix() + T = np.eye(4) + T[:3, :3] = R_mat + T[:3, 3] = np.array([x, y, z]) + return T + + +def transform_points_l2c(lidar_points, T_l2c): + N = lidar_points.shape[0] + pts_hom = np.hstack((lidar_points, np.ones((N, 1)))) # (N,4) + pts_cam = (T_l2c @ pts_hom.T).T # (N,4) + return pts_cam[:, :3] + +# ----- New: Vectorized projection function ----- +def project_points(pts_cam, K, original_lidar_points): + """ + Vectorized version. + pts_cam: (N,3) array of points in camera coordinates. + original_lidar_points: (N,3) array of points in LiDAR coordinates. + Returns a (M,5) array: [u, v, X_lidar, Y_lidar, Z_lidar] for all points with Z>0. + """ + mask = pts_cam[:, 2] > 0 + pts_cam_valid = pts_cam[mask] + lidar_valid = original_lidar_points[mask] + Xc = pts_cam_valid[:, 0] + Yc = pts_cam_valid[:, 1] + Zc = pts_cam_valid[:, 2] + u = (K[0, 0] * (Xc / Zc) + K[0, 2]).astype(np.int32) + v = (K[1, 1] * (Yc / Zc) + K[1, 2]).astype(np.int32) + proj = np.column_stack((u, v, lidar_valid)) + return proj + + +# ----- Pedestrian Detector 2D (New Approach) ----- class PedestrianDetector2D(Component): """ - Detects pedestrians by fusing 2D YOLO detections with LiDAR point cloud data to estimate 3D poses. + Detects pedestrians by fusing YOLO 2D detections with LiDAR point cloud data. + + New approach: + 1) Downsample the LiDAR point cloud. + 2) Transform it to the camera coordinate system. + 3) Project all points onto the image plane. + 4) Filter the projected points using the YOLO 2D bounding boxes. + 5) Apply DBSCAN clustering and remove ground points. + 6) Compute an oriented bounding box and transform to the Vehicle frame. + 7) (Optional) Reproject the refined cluster onto the image. - This implementation uses message_filters to synchronize incoming RGB images and LiDAR data. - The synchronized callback caches the latest sensor data for processing in the update() method. + (Note: Do not add any additional visualization beyond what is necessary.) """ def __init__(self, vehicle_interface: GEMInterface): @@ -495,228 +282,166 @@ def __init__(self, vehicle_interface: GEMInterface): self.current_agents = {} self.tracked_agents = {} self.pedestrian_counter = 0 - # Variables to store the most recent synchronized sensor data: self.latest_image = None self.latest_lidar = None self.bridge = CvBridge() + self.start_pose_abs = None def rate(self) -> float: - # Process data at 4 Hz. return 4.0 def state_inputs(self) -> list: - # Expects vehicle state information as input. return ['vehicle'] def state_outputs(self) -> list: - # Outputs detected pedestrian agents. return ['agents'] def initialize(self): - """ - Initialize sensor subscriptions and the YOLO detector. - - Uses message_filters to synchronize the image and LiDAR data streams. - Also sets up the camera intrinsic matrix and LiDAR-to-camera (and vehicle) transformations. - """ self.rgb_sub = Subscriber('/oak/rgb/image_raw', Image) self.lidar_sub = Subscriber('/ouster/points', PointCloud2) self.sync = ApproximateTimeSynchronizer([self.rgb_sub, self.lidar_sub], queue_size=10, slop=0.1) self.sync.registerCallback(self.synchronized_callback) - # Initialize YOLO detector with the pretrained model and set to CUDA. self.detector = YOLO('../../knowledge/detection/yolov8s.pt') self.detector.to('cuda') - # Camera intrinsic matrix. self.K = np.array([[684.83331299, 0., 573.37109375], [0., 684.60968018, 363.70092773], [0., 0., 1.]]) - # LiDAR-to-vehicle transformation matrix. self.T_l2v = np.array([[0.99939639, 0.02547917, 0.023615, 1.1], [-0.02530848, 0.99965156, -0.00749882, 0.03773583], [-0.02379784, 0.00689664, 0.999693, 1.95320223], [0., 0., 0., 1.]]) - # LiDAR-to-camera transformation matrix. self.T_l2c = np.array([ [0.001090, -0.999489, -0.031941, 0.149698], [-0.007664, 0.031932, -0.999461, -0.397813], [0.999970, 0.001334, -0.007625, -0.691405], - [0.000000, 0.000000, 0.000000, 1.000000] + [0., 0., 0., 1.000000] ]) - # Inverse transformation: camera-to-LiDAR. self.T_c2l = np.linalg.inv(self.T_l2c) self.R_c2l = self.T_c2l[:3, :3] - # Camera origin expressed in LiDAR coordinates. self.camera_origin_in_lidar = self.T_c2l[:3, 3] def synchronized_callback(self, image_msg, lidar_msg): - """ - Callback function that is invoked when an image and a LiDAR message are received within the allowed time difference. - - This function converts the ROS messages into usable formats: - - The image is converted to an OpenCV BGR image. - - The LiDAR message is converted to a numpy array. - """ + step1 = time.time() try: self.latest_image = self.bridge.imgmsg_to_cv2(image_msg, "bgr8") except Exception as e: rospy.logerr("Failed to convert image: {}".format(e)) self.latest_image = None + step2 = time.time() self.latest_lidar = pc2_to_numpy(lidar_msg, want_rgb=False) + step3 = time.time() + print('image callback: ', step2-step1, 'lidar callback ', step3- step2) def update(self, vehicle: VehicleState) -> Dict[str, AgentState]: - """ - Process the latest synchronized sensor data to detect pedestrians and estimate their 3D poses. - - This method performs the following: - 1. Runs YOLO on the latest image to obtain 2D pedestrian detections. - 2. Backprojects the 2D detections into 3D rays in the LiDAR frame. - 3. Uses LiDAR data along the ray to estimate a 3D position (and size) for the pedestrian. - 4. Matches detections to existing tracked agents or creates new ones. - - Args: - vehicle (VehicleState): The current vehicle state. - - Returns: - Dict[str, AgentState]: Dictionary mapping agent IDs to their updated states. - """ - # Ensure both image and LiDAR data are available. + downsample = False if self.latest_image is None or self.latest_lidar is None: return {} current_time = self.vehicle_interface.time() - # Run YOLO inference on the current image (detecting only class 0, e.g., persons). + # Run YOLO to obtain 2D detections (class 0: persons) results = self.detector(self.latest_image, conf=0.4, classes=[0]) - boxes = np.array(results[0].boxes.xywh.cpu()) # Format: [center_x, center_y, width, height] - + boxes = np.array(results[0].boxes.xywh.cpu()) agents = {} - lidar_pc = self.latest_lidar.copy() + if downsample == True: + lidar_down = downsample_points(self.latest_lidar, voxel_size=0.1) + pts_cam = transform_points_l2c(lidar_down, self.T_l2c) + projected_pts = project_points(pts_cam, self.K, lidar_down) + + else: + # New approach: project the entire LiDAR point cloud to the image plane + step00 = time.time() + lidar_down = self.latest_lidar.copy() + step01 = time.time() + pts_cam = transform_points_l2c(lidar_down, self.T_l2c) + step02 = time.time() + projected_pts = project_points(pts_cam, self.K, lidar_down) # shape (N,5): [u, v, X, Y, Z] + step03 = time.time() + print(f'copy lidar data {step01-step00}s, transoforming to camear {step02-step01}s, transforming to image {step03-step02}s') + + # For each 2D bounding box, filter projected points instead of ray-casting for i, box in enumerate(boxes): + start = time.time() cx, cy, w, h = box - # Backproject the 2D pixel center into a 3D ray in the LiDAR coordinate frame. - ray_dir_cam = backproject_pixel(cx, cy, self.K) - ray_dir_lidar = self.R_c2l @ ray_dir_cam - ray_dir_lidar /= np.linalg.norm(ray_dir_lidar) - - # Attempt to locate the pedestrian's center along the ray using LiDAR data. - intersection, _, _ = find_human_center_on_ray( - lidar_pc, self.camera_origin_in_lidar, ray_dir_lidar, - t_min=0.4, t_max=25.0, t_step=0.1, - distance_threshold=0.5, min_points=5, ransac_threshold=0.05 - ) - if intersection is None: - # If no valid intersection is found, update the positions of already tracked agents - # based on the vehicle's forward velocity and time elapsed. - for agent in self.tracked_agents.values(): - dt = current_time - agent.pose.t - agent.pose.x -= vehicle.v * dt # Adjust x-coordinate assuming vehicle moves forward along x. - agent.pose.t = current_time + left = int(cx - w / 2) + right = int(cx + w / 2) + top = int(cy - h / 2) + bottom = int(cy + h / 2) + mask = (projected_pts[:, 0] >= left) & (projected_pts[:, 0] <= right) & \ + (projected_pts[:, 1] >= top) & (projected_pts[:, 1] <= bottom) + roi_pts = projected_pts[mask] + if roi_pts.shape[0] < 5: continue + # Extract the LiDAR 3D points corresponding to the ROI + points_3d = roi_pts[:, 2:5] + points_3d = filter_depth_points(points_3d, max_human_depth=0.8) + - # Estimate the pedestrian's physical dimensions using the detection box size and distance. - d = np.linalg.norm(intersection - self.camera_origin_in_lidar) - physical_width = (w * d) / self.K[0, 0] - physical_height = (h * d) / self.K[1, 1] - # Since the depth (x-direction) measurement is less reliable, use an empirical value. - half_extents = np.array([0.4, 1.1 * physical_width / 2, 1.1 * physical_height / 2]) - - # Extract LiDAR points within the ROI box around the estimated intersection. - roi_points = extract_roi_box(lidar_pc, intersection, half_extents) - if roi_points.shape[0] < 10: - refined_cluster = roi_points - else: - refined_cluster = refine_cluster(roi_points, intersection, eps=0.15, min_samples=10) - # Remove ground points from the refined cluster. + # Cluster the points and remove ground + refined_cluster = refine_cluster(points_3d, np.mean(points_3d, axis=0), eps=0.15, min_samples=10) refined_cluster = remove_ground_by_min_range(refined_cluster, z_range=0.03) - if refined_cluster is None or refined_cluster.shape[0] == 0: - # Fallback to the initial intersection if no valid cluster remains. - refined_center = intersection - dims = (0, 0, 0) - yaw, pitch, roll = 0, 0, 0 - elif refined_cluster.shape[0] <= 5: - # Skip detections with insufficient LiDAR points. + end1 = time.time() + print('refine cluster: ', end1-start) + if refined_cluster.shape[0] < 5: continue - else: - # Create an Open3D point cloud from the refined cluster and compute its oriented bounding box. - pcd = o3d.geometry.PointCloud() - pcd.points = o3d.utility.Vector3dVector(refined_cluster) - obb = pcd.get_oriented_bounding_box() - refined_center = obb.center - dims = tuple(obb.extent) - R_lidar = obb.R.copy() - - # Transform the refined center from LiDAR coordinates to the vehicle frame. - refined_center_lidar_hom = np.array([refined_center[0], - refined_center[1], - refined_center[2], - 1.0]) - refined_center_vehicle_hom = self.T_l2v @ refined_center_lidar_hom - refined_center_vehicle = refined_center_vehicle_hom[:3] - - # Compute the orientation (yaw, pitch, roll) in the vehicle frame. - R_vehicle = self.T_l2v[:3, :3] @ R_lidar - euler_angles_vehicle = R.from_matrix(R_vehicle).as_euler('zyx', degrees=False) - yaw, pitch, roll = euler_angles_vehicle - refined_center = refined_center_vehicle # Use vehicle frame coordinates for output - - ''' - Note that this part can be used for converting the speed relative to vehicle -> speed relative to START frame - However, the VehicleState may read in Global coordinate (i.e. long and lat) instead of relative to START frame - In order to avoid this issue, we found it is actually easier to implement this in the planning code. - Planning function has access to Allstate instead of just Vehicle state, - therefore it is easier to specify which coordinate system to use - ''' - ''' - i.e. Simliar function is implemented in GEMstack/onboard/planning/pedestrian_yield_logic.py L569-573 - # If the pedestrian's frame is CURRENT, convert the pedestrian's frame to START. - elif a.pose.frame == ObjectFrameEnum.CURRENT: - a_x = a.pose.x + curr_x - a_y = a.pose.y + curr_y - a_v_x = a_v_x - curr_v - ''' - # curr_x = vehicle.pose.x - # curr_y = vehicle.pose.y - # curr_yaw = vehicle.pose.yaw - # curr_pitch = vehicle.pose.pitch - # curr_roll = vehicle.pose.roll - # refined_center[0] += curr_x - # refined_center[1] += curr_y - # euler_angles_vehicle[0] += curr_yaw - # euler_angles_vehicle[1] += curr_pitch - # euler_angles_vehicle[2] += curr_roll - - # Create a new pose for the detected pedestrian in the vehicle frame. + + # Compute the oriented bounding box + pcd = o3d.geometry.PointCloud() + pcd.points = o3d.utility.Vector3dVector(refined_cluster) + obb = pcd.get_oriented_bounding_box() + refined_center = obb.center + dims = tuple(obb.extent) + R_lidar = obb.R.copy() + end2 = time.time() + print('compute bounding box ', end2-end1) + # Transform the refined center from LiDAR to Vehicle frame + refined_center_hom = np.append(refined_center, 1) + refined_center_vehicle_hom = self.T_l2v @ refined_center_hom + refined_center_vehicle = refined_center_vehicle_hom[:3] + + R_vehicle = self.T_l2v[:3, :3] @ R_lidar + euler_vehicle = R.from_matrix(R_vehicle).as_euler('zyx', degrees=False) + yaw, pitch, roll = euler_vehicle + refined_center = refined_center_vehicle + + # Convert from Vehicle frame to START frame + if self.start_pose_abs is None: + self.start_pose_abs = vehicle.pose # Initialize once + + # Obtain the vehicle's pose in the START frame as a pose state. + # Assume vehicle.pose.to_frame returns a pose state with attributes x, y, z, yaw, pitch, roll. + vehicle_start_pose = vehicle.pose.to_frame(ObjectFrameEnum.START, vehicle.pose, self.start_pose_abs) + + # Compose the 4x4 transformation matrix from the vehicle_start_pose + T_vehicle_to_start = pose_to_matrix(vehicle_start_pose) + + # Transform the refined center (in Vehicle frame) to the START frame + refined_center_hom_vehicle = np.append(refined_center, 1) + refined_center_start = (T_vehicle_to_start @ refined_center_hom_vehicle)[:3] + new_pose = ObjectPose( t=current_time, - x=refined_center[0], - y=refined_center[1], - z=refined_center[2], + x=refined_center_start[0], + y=refined_center_start[1], + z=refined_center_start[2], yaw=yaw, pitch=pitch, roll=roll, - frame=ObjectFrameEnum.CURRENT + frame=ObjectFrameEnum.START ) - # Attempt to match the new detection with an existing tracked pedestrian. existing_id = match_existing_pedestrian( new_center=np.array([new_pose.x, new_pose.y, new_pose.z]), new_dims=dims, existing_agents=self.tracked_agents, distance_threshold=2.0 ) - if existing_id is not None: - # Update the state of the matched pedestrian using the computed velocity. - old_agent_state = self.tracked_agents[existing_id] - dt = new_pose.t - old_agent_state.pose.t - ''' - We found that the velocity calculated here is not entirely stable. - We are implementing a more stable method, e.g. kalman filter based one - ''' - vx, vy, vz = compute_velocity(old_agent_state.pose, new_pose, dt) - + old_state = self.tracked_agents[existing_id] + dt = new_pose.t - old_state.pose.t + vx, vy, vz = compute_velocity(old_state.pose, new_pose, dt) updated_agent = AgentState( pose=new_pose, dimensions=dims, @@ -729,10 +454,8 @@ def update(self, vehicle: VehicleState) -> Dict[str, AgentState]: agents[existing_id] = updated_agent self.tracked_agents[existing_id] = updated_agent else: - # If no match is found, create a new pedestrian agent. agent_id = f"pedestrian{self.pedestrian_counter}" self.pedestrian_counter += 1 - new_agent = AgentState( pose=new_pose, dimensions=dims, @@ -750,24 +473,22 @@ def update(self, vehicle: VehicleState) -> Dict[str, AgentState]: stale_ids = [agent_id for agent_id, agent in self.tracked_agents.items() if current_time - agent.pose.t > 5.0] for agent_id in stale_ids: - rospy.loginfo(f"Removing stale agent: {agent_id}") - del self.tracked_agents[agent_id] - # Log the details of each detected agent. + rospy.loginfo(f"Removing stale agent: {agent_id}\n") for agent_id, agent in agents.items(): - rospy.loginfo(f"Agent ID: {agent_id}, Pose: {agent.pose}, Velocity: {agent.velocity}") - + p = agent.pose + # Format pose and velocity with 3 decimals (or as needed) + rospy.loginfo( + f"Agent ID: {agent_id}\n" + f"Pose: (x: {p.x:.3f}, y: {p.y:.3f}, z: {p.z:.3f}, " + f"yaw: {p.yaw:.3f}, pitch: {p.pitch:.3f}, roll: {p.roll:.3f})\n" + f"Velocity: (vx: {agent.velocity[0]:.3f}, vy: {agent.velocity[1]:.3f}, vz: {agent.velocity[2]:.3f})\n" + ) return agents # ----- Fake Pedestrian Detector 2D (for Testing Purposes) ----- class FakePedestrianDetector2D(Component): - """ - A dummy pedestrian detector that simulates detections during pre-defined time intervals. - - This component is useful for testing the overall system without relying on real sensor data. - """ - def __init__(self, vehicle_interface: GEMInterface): self.vehicle_interface = vehicle_interface self.times = [(5.0, 20.0), (30.0, 35.0)] @@ -795,17 +516,6 @@ def update(self, vehicle: VehicleState) -> Dict[str, AgentState]: def box_to_fake_agent(box): - """ - Create a fake AgentState from a bounding box represented as (x, y, width, height). - - The pose is computed using the center of the box, and the dimensions are treated as 2D approximations. - - Args: - box (tuple): A tuple (x, y, w, h) representing the bounding box. - - Returns: - AgentState: The simulated pedestrian agent. - """ x, y, w, h = box pose = ObjectPose(t=0, x=x + w / 2, y=y + h / 2, z=0, yaw=0, pitch=0, roll=0, frame=ObjectFrameEnum.CURRENT) dims = (w, h, 0) @@ -815,6 +525,4 @@ def box_to_fake_agent(box): if __name__ == '__main__': - # This module is intended to be used within the vehicle interface context. - # For standalone testing, instantiate a fake vehicle state and call update(). pass diff --git a/homework/cone.pt b/homework/cone.pt new file mode 100644 index 000000000..002a39ede Binary files /dev/null and b/homework/cone.pt differ diff --git a/launch/cone_detection.yaml b/launch/cone_detection.yaml new file mode 100644 index 000000000..b30ccf26b --- /dev/null +++ b/launch/cone_detection.yaml @@ -0,0 +1,119 @@ +description: "Run the yielding trajectory planner on the real vehicle with real perception" +mode: hardware +vehicle_interface: gem_hardware.GEMHardwareInterface +mission_execution: StandardExecutor + +# Recovery behavior after a component failure +recovery: + planning: + trajectory_tracking : recovery.StopTrajectoryTracker + +# Driving behavior for the GEM vehicle. Runs real pedestrian perception, yield planner, but does not send commands to real vehicle. +drive: + perception: + state_estimation : GNSSStateEstimator + agent_detection : cone_detection.ConeDetector3D + perception_normalization : StandardPerceptionNormalizer + planning: + relations_estimation: + type: pedestrian_yield_logic.PedestrianYielder + args: + mode: 'real' + params: { + 'yielder': 'expert', # 'expert', 'analytic', or 'simulation' + 'planner': 'milestone', # 'milestone', 'dt', or 'dx' + 'desired_speed': 1.0, # m/s, 1.5 m/s seems max speed? Feb24 + 'acceleration': 0.75 # m/s2, 0.5 is not enough to start moving. Feb24 + } + route_planning: + type: StaticRoutePlanner + args: [!relative_path '../GEMstack/knowledge/routes/forward_15m.csv','start'] + motion_planning: longitudinal_planning.YieldTrajectoryPlanner + trajectory_tracking: + type: pure_pursuit.PurePursuitTrajectoryTracker + print: False + + +log: + # Specify the top-level folder to save the log files. Default is 'logs' + #folder : 'logs' + # If prefix is specified, then the log folder will be named with the prefix followed by the date and time. Default no prefix + #prefix : 'fixed_route_' + # If suffix is specified, then logs will output to folder/prefix+suffix. Default uses date and time as the suffix + #suffix : 'test3' + # Specify which ros topics to record to vehicle.bag. Default records nothing. This records the "standard" ROS topics. + ros_topics : + # Specify options to pass to rosbag record. Default is no options. + #rosbag_options : '--split --size=1024' + # If True, then record all readings / commands of the vehicle interface. Default False + vehicle_interface : True + # Specify which components to record to behavior.json. Default records nothing + components : ['state_estimation','agent_detection','motion_planning'] + # Specify which components of state to record to state.json. Default records nothing + #state: ['all'] + # Specify the rate in Hz at which to record state to state.json. Default records at the pipeline's rate + #state_rate: 10 + # If True, then make plots of the recorded data specifically from PurePursuitTrajectoryTracker_debug.csv and report metrics. Default False + #auto_plot : True +replay: # Add items here to set certain topics / inputs to be replayed from logs + # Specify which log folder to replay from + log: + ros_topics : [] + components : [] + +#usually can keep this constant +computation_graph: !include "../GEMstack/knowledge/defaults/computation_graph.yaml" + +variants: + detector_only: + run: + description: "Run the pedestrian detection code" + drive: + planning: + trajectory_tracking: + real_sim: + run: + description: "Run the pedestrian detection code with real detection and fake simulation" + mode: hardware + vehicle_interface: + type: gem_mixed.GEMRealSensorsWithSimMotionInterface + args: + scene: !relative_path '../scenes/xyhead_demo.yaml' + mission_execution: StandardExecutor + require_engaged: False + visualization: !include "klampt_visualization.yaml" + drive: + perception: + state_estimation : OmniscientStateEstimator + planning: + relations_estimation: + type: pedestrian_yield_logic.PedestrianYielder + args: + mode: 'sim' + + fake_real: + run: + description: "Run the yielding trajectory planner on the real vehicle with faked perception" + drive: + perception: + agent_detection : cone_detection.FakeConeDetector2D + + fake_sim: + run: + description: "Run the yielding trajectory planner in simulation with faked perception" + mode: simulation + vehicle_interface: + type: gem_simulator.GEMDoubleIntegratorSimulationInterface + args: + scene: !relative_path '../scenes/xyhead_demo.yaml' + visualization: !include "klampt_visualization.yaml" + drive: + perception: + # agent_detection : pedestrian_detection.FakePedestrianDetector2D + agent_detection : OmniscientAgentDetector #this option reads agents from the simulator + state_estimation : OmniscientStateEstimator + planning: + relations_estimation: + type: pedestrian_yield_logic.PedestrianYielder + args: + mode: 'sim' \ No newline at end of file