diff --git a/GEMstack/onboard/perception/AgentTracker.py b/GEMstack/onboard/perception/AgentTracker.py new file mode 100644 index 000000000..55e2cd917 --- /dev/null +++ b/GEMstack/onboard/perception/AgentTracker.py @@ -0,0 +1,144 @@ +import math +from typing import Dict, List + +from GEMstack.onboard.perception.IdTracker import IdTracker +from GEMstack.onboard.perception.PrevAgent import PrevAgent +from GEMstack.state.agent import AgentState + + +class AgentTracker(): + """Associates and tracks AgentState agents. + """ + def __init__(self): + # List of PrevAgent objects (each keeps track of the last seen state and time since seen) + self.prev_agents: List[PrevAgent] = [] + # List of currently visible AgentState objects + self.current_agents: List[AgentState] = [] + # Maximum time (in seconds) to keep a lost agent before dropping it. + self.drop_agent_t: float = 1.0 + # Id tracker for creating new unique pedestrian IDs. + self.id_tracker = IdTracker() + + def assign_ids(self, agents: list) -> Dict[str,AgentState]: + # Act with the assumption that you are being sent a list of AgentState objects and you need to use the object fields to keep track of them for your task + # Further act on the assumption that we will decide the id's of the pedestrians by assuming that 2 pedestrians are the same pedestrian if a + # previously stored AgentState pose and dimensions overlap with a newly passed in AgentState + # Act on the assumption that the AgentState objects are all in reference to the start frame of the vehicle + # some helper functions in this class, LostAgent.py, and IdTracker.py have been created to try to help you out with your task. + # Assume that the output returned from this function will be a dictionary of AgentState objects with the key corresponding to their id + """ + Associates new AgentState objects with existing tracked agents based on overlap. + If an agent does not match any previously tracked agent, a new unique id is assigned. + Also updates the “lost” time for agents that are not matched in this frame. + + Parameters: + agents (list): List of AgentState objects for the current frame + (already converted to the start frame of reference). + + Returns: + Dict[str, AgentState]: Dictionary mapping agent id (as a string) to AgentState. + """ + dt = 1.0 # Assuming a fixed time-step of 1 second (for simplicity) + output_agents: Dict[str, AgentState] = {} + matched_ids = set() + updated_prev_agents: List[PrevAgent] = [] + + # Process each new agent from the current frame. + for new_agent in agents: + found_match = None + # Look for a previously tracked agent whose bounding box overlaps. + for prev in self.prev_agents: + if prev.last_id in matched_ids: + continue # Already matched with another new agent. + if self.__agents_overlap(prev.last_state, new_agent): + found_match = prev + break + + if found_match is not None: + # update velocity using the change in position. + if hasattr(new_agent, 'velocity'): + new_agent.velocity = self.__calculate_velocity(found_match.last_state, new_agent, dt) + + # Update the matched agent’s state and reset its lost-time counter. + found_match.last_state = new_agent + found_match.time_since_seen = 0.0 + matched_ids.add(found_match.last_id) + output_agents[str(found_match.last_id)] = new_agent + updated_prev_agents.append(found_match) + else: + # No match found – assign a new unique id. + new_id = self.id_tracker.get_new_ped_id() + new_prev = PrevAgent(new_id, new_agent) + # initialize velocity to 0. + if hasattr(new_agent, 'velocity'): + new_agent.velocity = 0.0 + output_agents[str(new_id)] = new_agent + updated_prev_agents.append(new_prev) + + # For all previously tracked agents that were not matched this frame, + # update their time-since-seen and only keep them if they have not timed out. + for prev in self.prev_agents: + if prev.last_id not in matched_ids: + prev.update_time(dt) + if prev.time_since_seen < self.drop_agent_t: + updated_prev_agents.append(prev) + + # Save the updated list of tracked agents. + self.prev_agents = updated_prev_agents + # update current_agents to include only those seen in the current frame. + self.current_agents = list(output_agents.values()) + return output_agents + + def __convert_to_start_frame(self): + """Converts a list of AgentState agents from ouster Lidar frame of + reference (which is in reference to the current frame) to start + frame frame of reference + """ + # you can ignore this function akul + pass + + def __agents_overlap(self, ped1, ped2) -> bool: + """ + Determines if two AgentState objects overlap based on their pose and dimensions. + + Assumes each AgentState has: + - pose with attributes x and y. + - dimensions with attributes width and height. + + Returns: + bool: True if they overlap; False otherwise. + """ + # Get first agent's properties. + x1, y1 = ped1.pose.x, ped1.pose.y + w1, h1 = ped1.dimensions.width, ped1.dimensions.height + + # Get second agent's properties. + x2, y2 = ped2.pose.x, ped2.pose.y + w2, h2 = ped2.dimensions.width, ped2.dimensions.height + + # Compute bounding boxes (assuming (x, y) is the center). + left1, right1 = x1 - w1 / 2, x1 + w1 / 2 + top1, bottom1 = y1 - h1 / 2, y1 + h1 / 2 + + left2, right2 = x2 - w2 / 2, x2 + w2 / 2 + top2, bottom2 = y2 - h2 / 2, y2 + h2 / 2 + + # Check for overlap between the bounding boxes. + overlap = not (right1 < left2 or left1 > right2 or bottom1 < top2 or top1 > bottom2) + return overlap + + def __calculate_velocity(self, old_state: AgentState, new_state: AgentState, dt: float) -> float: + """ + Calculates the velocity based on the change in pose over time. + + Parameters: + old_state (AgentState): The previous state. + new_state (AgentState): The current state. + dt (float): Time difference between the two states. + + Returns: + float: The computed velocity. + """ + dx = new_state.pose.x - old_state.pose.x + dy = new_state.pose.y - old_state.pose.y + return math.sqrt(dx * dx + dy * dy) / dt if dt > 0 else 0.0 diff --git a/GEMstack/onboard/perception/IdTracker.py b/GEMstack/onboard/perception/IdTracker.py new file mode 100644 index 000000000..1c9c6ecbd --- /dev/null +++ b/GEMstack/onboard/perception/IdTracker.py @@ -0,0 +1,12 @@ +class IdTracker(): + """Abstracts out id tracking + """ + def __init__(self): + self.__ped_id = 0 + + def get_new_ped_id(self) -> int: + """Returns a unique pedestrian id + """ + assigned_id = self.__id + self.__ped_id += 1 # id will intentionally overflow to get back to 0 + return assigned_id diff --git a/GEMstack/onboard/perception/PrevAgent.py b/GEMstack/onboard/perception/PrevAgent.py new file mode 100644 index 000000000..210cea556 --- /dev/null +++ b/GEMstack/onboard/perception/PrevAgent.py @@ -0,0 +1,17 @@ +# GEM imports: +from ...state import AgentState + +class PrevAgent(): + def __init__(self, last_id: int, last_state: AgentState): + self.time_since_seen: float = 0.0 # Time since the agent was last seen in seconds + self.last_id: int = last_id + self.last_state: AgentState = last_state + + def update_time(time: float): + """Updates the time since the agent was last seen + """ + if time <= 0: + # TODO: log error here + print("UPDATE TIME FOR LOST AGENT WAS LESS THAN OR EQUAL TO 0") + else: + self.time_since_seen += time \ No newline at end of file diff --git a/GEMstack/onboard/perception/fusion.py b/GEMstack/onboard/perception/fusion.py index 018bca8a7..66c50d7ac 100644 --- a/GEMstack/onboard/perception/fusion.py +++ b/GEMstack/onboard/perception/fusion.py @@ -10,6 +10,8 @@ # YOLO + GEM from ultralytics import YOLO from fusion_utils import * +from ...state import AgentState + class Fusion3D(): def __init__(self): @@ -19,95 +21,27 @@ def __init__(self): self.current_dets_bboxes = [] self.prev_agents = [] # dict{id: agent} is more efficient, but list for self.current_agents = [] # simplicity to match update() output to start - self.visualization = True self.confidence = 0.7 self.classes_to_detect = 0 self.ground_threshold = 1.6 self.max_dist_percent = 0.7 + # Setup visualization variables + self.visualization = True # Set this to true for visualization, later change to get value from sys arg + # Load calibration data self.R = load_extrinsics(os.getcwd() + '/GEMstack/onboard/perception/calibration/extrinsics/R.npy') self.t = load_extrinsics(os.getcwd() + '/GEMstack/onboard/perception/calibration/extrinsics/t.npy') self.K = load_intrinsics(os.getcwd() + '/GEMstack/onboard/perception/calibration/camera_intrinsics.json') - # Subscribers and sychronizers - self.rgb_rosbag = message_filters.Subscriber('/oak/rgb/image_raw', Image) - self.top_lidar_rosbag = message_filters.Subscriber('/ouster/points', PointCloud2) - self.sync = message_filters.ApproximateTimeSynchronizer([self.rgb_rosbag, self.top_lidar_rosbag], queue_size=10, slop=0.1) - self.sync.registerCallback(self.fusion_callback) - # TF listener to get transformation from LiDAR to Camera - self.tf_listener = tf.TransformListener() + # self.tf_listener = tf.TransformListener()S # Publishers self.pub_pedestrians_pc2 = rospy.Publisher("/point_cloud/pedestrians", PointCloud2, queue_size=10) if(self.visualization): self.pub_image = rospy.Publisher("/camera/image_detection", Image, queue_size=1) - # TODO: refactor into pedestrian_detection.py - def update(self, vehicle : VehicleState) -> Dict[str,AgentState]: - self.prev_agents = self.current_agents - return self.prev_agents - - # clusters: list[ np.array(shape = (N, XYZ) ] - # TODO: Improve Algo Knn, ransac, etc. - def find_centers(clusters: list[np.ndarray]): - centers = [np.mean(clust, axis=0) for clust in clusters] - return centers - - # Beware: AgentState(PhysicalObject) builds bbox from - # dims [-l/2,l/2] x [-w/2,w/2] x [0,h], not - # [-l/2,l/2] x [-w/2,w/2] x [-h/2,h/2] - def find_dims(clusters): - # Add some small constant to height to compensate for - # objects distance to ground we filtered from lidar, etc.? - dims = [np.max(clust, axis= 0) - np.min(clust, axis= 0) for clust in clusters] - return dims - - # track_result: self.detector.track (YOLO) output - # flattened_pedestrians_3d_pts: Camera frame points, ind - # corresponds to object - # TODO: Finish adapt this func + fusion.py to multiple classes - def update_object_states(track_result, flattened_pedestrians_3d_pts: list[np.ndarray]): - track_ids = track_result[0].boxes.id.int().cpu().tolist() - num_objs = len(track_ids) - - if len(flattened_pedestrians_3d_pts) != num_objs: - raise Exception('Perception - Camera detections, points num. mismatch') - - # Combine funcs for efficiency in C - # Separate numpy prob still faster - obj_centers = find_centers(flattened_pedestrians_3d_pts) - obj_dims = find_dims(flattened_pedestrians_3d_pts) - - # Calculate new velocities - # TODO: Improve velocity algo + remove O(n) dict - # Moving Average across last N iterations pos/vel? - track_id_center_map = dict(zip(track_ids, obj_centers)) - # Object not seen -> velocity = None - vels = defaultdict(None) - for prev_agent in self.prev_agents: - if prev_agent.track_id in track_ids: - vels[track_id] = track_id_center_map[track_id] - np.array([prev_agent.pose.x, prev_agent.pose.y, prev_agent.pose.z]) - - # Update Current AgentStates - self.current_agents = [ - AgentState( - track_id = track_ids[ind] - pose=ObjectPose(t=0,obj_centers[0],obj_centers[1],obj_centers[2],yaw=0,pitch=0,roll=0,frame=ObjectFrameEnum.CURRENT), - # (l, w, h) - # TODO: confirm (z -> l, x -> w, y -> h) - dimensions=tuple(obj_dims[ind][2], obj_dims[ind][0], obj_dims[ind][1]), - outline=None, - type=AgentEnum.PEDESTRIAN, - activity=AgentActivityEnum.MOVING, - velocity=tuple(vels[track_ids[ind]]), - yaw_rate=0 - ) - for ind in len(num_objs) - ] - - def fusion_callback(self, rgb_image_msg: Image, lidar_pc2_msg: PointCloud2): # Convert to cv2 image and run detector cv_image = self.bridge.imgmsg_to_cv2(rgb_image_msg, "bgr8") @@ -195,10 +129,9 @@ def fusion_callback(self, rgb_image_msg: Image, lidar_pc2_msg: PointCloud2): self.pub_image.publish(ros_img) - -if __name__ == '__main__': - rospy.init_node('fusion_node', anonymous=True) - Fusion3D() - while not rospy.core.is_shutdown(): - rospy.rostime.wallsleep(0.5) +# if __name__ == '__main__': +# rospy.init_node('fusion_node', anonymous=True) +# Fusion3D() +# while not rospy.core.is_shutdown(): +# rospy.rostime.wallsleep(0.5) diff --git a/GEMstack/onboard/perception/pedestrian_detection.py b/GEMstack/onboard/perception/pedestrian_detection.py index f91145274..bdb5b8d66 100644 --- a/GEMstack/onboard/perception/pedestrian_detection.py +++ b/GEMstack/onboard/perception/pedestrian_detection.py @@ -1,27 +1,47 @@ -from ...state import AllState,VehicleState,ObjectPose,ObjectFrameEnum,AgentState,AgentEnum,AgentActivityEnum +from ...state import AllState,VehicleState,ObjectPose,ObjectFrameEnum,AgentState,AgentEnum,AgentActivityEnum #, AllState from ..interface.gem import GEMInterface from ..component import Component from ultralytics import YOLO import cv2 from typing import Dict from cv_bridge import CvBridge -from sensor_msgs.msg import Image +from sensor_msgs.msg import Image, PointCloud2 +import sensor_msgs.point_cloud2 as pc2 import rospy import os +from typing import Union +from fusion import Fusion3D -def box_to_fake_agent(box): - """Creates a fake agent state from an (x,y,w,h) bounding box. - - The location and size are pretty much meaningless since this is just giving a 2D location. +class PedestrianDetector2DShared(Component): + """ + Detects pedestrians. + self.vehicle_interface -> GEM car info """ - x,y,w,h = box - pose = ObjectPose(t=0,x=x+w/2,y=y+h/2,z=0,yaw=0,pitch=0,roll=0,frame=ObjectFrameEnum.CURRENT) - dims = (w,h,0) - return AgentState(pose=pose,dimensions=dims,outline=None,type=AgentEnum.PEDESTRIAN,activity=AgentActivityEnum.MOVING,velocity=(0,0,0),yaw_rate=0) + def __init__(self,vehicle_interface : GEMInterface): + self.vehicle_interface = vehicle_interface + + def rate(self): + return 4.0 + def state_inputs(self): + return ['vehicle'] -class PedestrianDetector2D(Component): + def state_outputs(self): + return ['agents'] + + def box_to_fake_agent(box) -> AgentState: + """Creates a fake agent state from an (x,fy,w,h) bounding box. + + The location and size are pretty much meaningless since this is just giving a 2D location. + """ + x,y,w,h = box + pose = ObjectPose(t=0,x=x+w/2,y=y+h/2,z=0,yaw=0,pitch=0,roll=0,frame=ObjectFrameEnum.CURRENT) + dims = (w,h,0) + return AgentState(pose=pose,dimensions=dims,outline=None,type=AgentEnum.PEDESTRIAN,activity=AgentActivityEnum.MOVING,velocity=(0,0,0),yaw_rate=0) + + +class PedestrianDetector2D(PedestrianDetector2DShared): """Detects pedestrians. self.vehicle -> GEM car info @@ -34,25 +54,14 @@ class PedestrianDetector2D(Component): """ def __init__(self,vehicle_interface : GEMInterface): - self.vehicle_interface = vehicle_interface self.detector = YOLO(os.getcwd()+'/GEMstack/knowledge/detection/yolov8n.pt') # change to get model value from sys arg self.last_person_boxes = [] - self.pedestrians = {} - self.visualization = True # Set this to true for visualization, later change to get value from sys arg - self.confidence = 0.7 - self.classes_to_detect = 0 + self.confidence = 0.7 # YOLO Confidence level at which a pedestrian is considered detected + self.classes_to_detect = 0 # The YOLO class id's that are being detected - def rate(self): - return 4.0 - - def state_inputs(self): - return ['vehicle'] - - def state_outputs(self): - return ['agents'] + self.fusion = Fusion3D() def initialize(self): - """Initializes subscribers and publishers self.vehicle_interface.subscribe_sensor -> GEM Car camera subscription @@ -60,145 +69,137 @@ def initialize(self): self.pub_image -> Publishes Image with detection for visualization """ - #tell the vehicle to use image_callback whenever 'front_camera' gets a reading, and it expects images of type cv2.Mat + # Subscribers and sychronizers + self.rgb_rosbag = message_filters.Subscriber('/oak/rgb/image_raw', Image) + self.top_lidar_rosbag = message_filters.Subscriber('/ouster/points', PointCloud2) + self.sync = message_filters.ApproximateTimeSynchronizer([self.rgb_rosbag, self.top_lidar_rosbag], queue_size=2, slop=0.1) + self.sync.registerCallback(self.detect_pedestrians) + + def detect_pedestrians(self, front_image_msg: Image, ouster_msg: PointCloud2D): + """Fuses Image and Lidar information to detect pedestrians + """ + # TODO: make fusion_callback return some data. Call the rest of Lukas' functions and Akul's stuff inside of this function. + data = self.fusion.fusion_callback(rgb_image_msg=front_image_msg, lidar_pc2_msg=ouster_msg) + + # Original plan for how to do this: + # # combined_point_cloud = self.combine_point_clouds # Removed because we only have transformation for a single lidar (ouster) at the moment. Probably should return np.array - # GEM Car subacriber - # self.vehicle_interface.subscribe_sensor('front_camera',self.image_callback,cv2.Mat) + # point_cloud = np.array(list(pc2.read_points(ouster_msg, skip_nans=True)), dtype=np.float32)[:, :3] - # Webcam - # rospy.Subscriber('/webcam', Image, self.image_callback) + # image_pedestrians = self.detect_pedestrians_in_image(image=front_image_msg) - # Testing with rosbag - self.rosbag_test = rospy.Subscriber('/oak/rgb/image_raw',Image, self.image_callback,queue_size=1) - if(self.visualization): - self.pub_image = rospy.Publisher("/camera/image_detection", Image, queue_size=1) - pass - - # Use cv2.Mat for GEM Car, Image for RosBag - def image_callback(self, image : Image): #image : cv2.Mat): + # for id in image_pedestrians: + # #### Scrum-20: Calculate and Convert Image Points to Lidar Frame of Reference Task: + # (estimated_ped_cloud, flat_center) = self.extract_ped_cloud(point_cloud=point_cloud, image_pedestrian=image_pedestrians[key]) - """Detects pedestrians using the model provided when new image is passed. + # #### Scrum-21 & 39:Calculate Pedestrian Center and Dimensions + # # Determine a more exact center and dimensions of the pedestrian by ignoring background point cloud points + # (pose, dims) = self.calc_ped_center_dims(estimated_ped_cloud, flat_center) + # image_pedestrians[id].pose = pose + # image_pedestrians[id].dims = dims - Converts Image.msg to cv2 format (Might need to change this to use cv2.Mat) and uses the model to detect pedestrian - IF visualization is true, will publish an image with pedestrians detected. + # #### Scrum-35: Associate and Track Pedestrian Id's + # self.associate_and_track_peds(image_pedestrians) - Hardcoded values for now: - Detected only pedestrians -> Class = 0 - Confidence level -> 0.7 + # TODO: refactor into pedestrian_detection.py + def update(self, vehicle : VehicleState) -> Dict[str,AgentState]: + self.prev_agents = self.current_agents + return self.prev_agents + + # clusters: list[ np.array(shape = (N, XYZ) ] + # TODO: Improve Algo Knn, ransac, etc. + def find_centers(clusters: list[np.ndarray]): #TODO: comment specifies np.array, type hint specifies np.ndarray + centers = [np.mean(clust, axis=0) for clust in clusters] + return centers + + # Beware: AgentState(PhysicalObject) builds bbox from + # dims [-l/2,l/2] x [-w/2,w/2] x [0,h], not + # [-l/2,l/2] x [-w/2,w/2] x [-h/2,h/2] + def find_dims(clusters): + # Add some small constant to height to compensate for + # objects distance to ground we filtered from lidar, etc.? + dims = [np.max(clust, axis= 0) - np.min(clust, axis= 0) for clust in clusters] + return dims + + # track_result: self.detector.track (YOLO) output + # flattened_pedestrians_3d_pts: Camera frame points, ind + # corresponds to object + # TODO: Finish adapt this func + fusion.py to multiple classes + def update_object_states(track_result, flattened_pedestrians_3d_pts: list[np.ndarray]): + track_ids = track_result[0].boxes.id.int().cpu().tolist() + num_objs = len(track_ids) - """ - - # Use Image directly for GEM Car - # track_result = self.detector.track(source=image, classes=self.classes_to_detect, persist=True, conf=self.confidence) - - # Convert to CV2 format for RosBag - bridge = CvBridge() - image = bridge.imgmsg_to_cv2(image, "bgr8") - track_result = self.detector.track(source=image, classes=self.classes_to_detect, persist=True, conf=self.confidence) - - self.last_person_boxes = [] - boxes = track_result[0].boxes - - # Used for visualization - if(self.visualization): - label_text = "Pedestrian " - font = cv2.FONT_HERSHEY_SIMPLEX - font_scale = 0.5 - font_color = (255, 255, 255) # White text - outline_color = (0, 0, 0) # Black outline - line_type = 1 - text_thickness = 2 # Text thickness - outline_thickness = 1 # Thickness of the text outline - - # Unpacking box dimentions detected into x,y,w,h - for box in boxes: - - xywh = box.xywh[0].tolist() - self.last_person_boxes.append(xywh) - x, y, w, h = xywh - id = box.id.item() - - # Stores AgentState in a dict, can be removed if not required - pose = ObjectPose(t=0,x=x,y=y,z=0,yaw=0,pitch=0,roll=0,frame=ObjectFrameEnum.CURRENT) - dims = (w,h,0) - if(id not in self.pedestrians.keys()): - self.pedestrians[id] = AgentState(pose=pose,dimensions=dims,outline=None,type=AgentEnum.PEDESTRIAN,activity=AgentActivityEnum.MOVING,velocity=(0,0,0),yaw_rate=0) - else: - self.pedestrians[id].pose = pose - self.pedestrians[id].dims = dims - - # Used for visualization - if(self.visualization): - # Draw bounding box - cv2.rectangle(image, (int(x - w / 2), int(y - h / 2)), (int(x + w / 2), int(y + h / 2)), (255, 0, 255), 3) - - # Define text label - x = int(x - w / 2) - y = int(y - h / 2) - label = label_text + str(id) + " : " + str(round(box.conf.item(), 2)) - - # Get text size - text_size, baseline = cv2.getTextSize(label, font, font_scale, line_type) - text_w, text_h = text_size - - # Position text above the bounding box - text_x = x - text_y = y - 10 if y - 10 > 10 else y + h + text_h - - # Draw text outline for better visibility, uncomment for outline - # for dx, dy in [(-1, -1), (-1, 1), (1, -1), (1, 1)]: - # cv2.putText(image, label, (text_x + dx, text_y - baseline + dy), font, font_scale, outline_color, outline_thickness) - - # Draw main text on top of the outline - cv2.putText(image, label, (text_x, text_y - baseline), font, font_scale, font_color, text_thickness) - + if len(flattened_pedestrians_3d_pts) != num_objs: + raise Exception('Perception - Camera detections, points num. mismatch') - # Used for visualization - if(self.visualization): - ros_img = bridge.cv2_to_imgmsg(image, 'bgr8') - self.pub_image.publish(ros_img) - - #uncomment if you want to debug the detector... - # print(self.last_person_boxes) - # print(self.pedestrians.keys()) - #for bb in self.last_person_boxes: - # x,y,w,h = bb - # cv2.rectangle(image, (int(x-w/2), int(y-h/2)), (int(x+w/2), int(y+h/2)), (255, 0, 255), 3) - #cv2.imwrite("pedestrian_detections.png",image) + # Combine funcs for efficiency in C + # Separate numpy prob still faster + obj_centers = find_centers(flattened_pedestrians_3d_pts) + obj_dims = find_dims(flattened_pedestrians_3d_pts) + + # Calculate new velocities + # TODO: Improve velocity algo + remove O(n) dict + # Moving Average across last N iterations pos/vel? + track_id_center_map = dict(zip(track_ids, obj_centers)) + # Object not seen -> velocity = None + vels = defaultdict(None) + for prev_agent in self.prev_agents: + if prev_agent.track_id in track_ids: + vels[track_id] = track_id_center_map[track_id] - np.array([prev_agent.pose.x, prev_agent.pose.y, prev_agent.pose.z]) + + # Update Current AgentStates + self.current_agents = [ + AgentState( + track_id = track_ids[ind], + pose=ObjectPose(t=0,obj_centers[0],obj_centers[1],obj_centers[2],yaw=0,pitch=0,roll=0,frame=ObjectFrameEnum.CURRENT), + # (l, w, h) + # TODO: confirm (z -> l, x -> w, y -> h) + dimensions=tuple(obj_dims[ind][2], obj_dims[ind][0], obj_dims[ind][1]), + outline=None, + type=AgentEnum.PEDESTRIAN, + activity=AgentActivityEnum.MOVING, + velocity=tuple(vels[track_ids[ind]]), + yaw_rate=0 + ) + for ind in len(num_objs) + ] - def update(self, vehicle : VehicleState) -> Dict[str,AgentState]: + def extract_ped_cloud(point_cloud: np.array, image_pedestrian: AgentState): + # return (estimated_ped_cloud, flat_center) + pass + + def calc_ped_center_dims(estimated_ped_cloud: np.array, flat_center: np.array): + #### Scrum-21 & 39:Calculate Pedestrian Center and Dimensions + # Determine a more exact center and dimensions of the pedestrian by ignoring background point cloud points + # return (pose, dims) + pass + + def associate_and_track_peds(image_pedestrians: dict): + #### Scrum-35: Associate and Track Pedestrian Id'sSS + + def update(self, vehicle: VehicleState) -> Dict[str, AgentState]: res = {} for i,b in enumerate(self.last_person_boxes): x,y,w,h = b - res['pedestrian'+str(i)] = box_to_fake_agent(b) + res['pedestrian'+str(i)] = self.box_to_fake_agent(b) if len(res) > 0: print("Detected",len(res),"pedestrians") return res -class FakePedestrianDetector2D(Component): +class FakePedestrianDetector2D(PedestrianDetector2DShared): """Triggers a pedestrian detection at some random time ranges""" - def __init__(self,vehicle_interface : GEMInterface): - self.vehicle_interface = vehicle_interface + def __init__(self, vehicle_interface: GEMInterface): self.times = [(5.0,20.0),(30.0,35.0)] self.t_start = None - - def rate(self): - return 4.0 - def state_inputs(self): - return ['vehicle'] - - def state_outputs(self): - return ['agents'] - - def update(self, vehicle : VehicleState) -> Dict[str,AgentState]: + def update(self, vehicle: VehicleState) -> Dict[str, AgentState]: if self.t_start is None: self.t_start = self.vehicle_interface.time() t = self.vehicle_interface.time() - self.t_start res = {} for times in self.times: if t >= times[0] and t <= times[1]: - res['pedestrian0'] = box_to_fake_agent((0,0,0,0)) + res['pedestrian0'] = self.box_to_fake_agent((0,0,0,0)) print("Detected a pedestrian") return res