From f2b17788951279950f835028319ffd09ae08e92b Mon Sep 17 00:00:00 2001 From: LucasEby Date: Sat, 22 Feb 2025 15:49:48 -0500 Subject: [PATCH] Updated velocity and id code to work off of data that's already in start frame --- .../perception/pedestrian_detection.py | 129 +++++++----------- 1 file changed, 46 insertions(+), 83 deletions(-) diff --git a/GEMstack/onboard/perception/pedestrian_detection.py b/GEMstack/onboard/perception/pedestrian_detection.py index 43d0260fa..ce2f6ec91 100644 --- a/GEMstack/onboard/perception/pedestrian_detection.py +++ b/GEMstack/onboard/perception/pedestrian_detection.py @@ -140,23 +140,31 @@ def init_debug(self,) -> None: self.pub_image = rospy.Publisher("/camera/image_detection", Image, queue_size=1) def update(self, vehicle : VehicleState) -> Dict[str,AgentState]: - # Convert to start state. # self.current_vehicle_state | data from GNSS of vehicle pose in GLOBAL Frame # self.start_pose_abs | data from GNSS of the first vehicle pose in GLOBAL Frame vehicle.pose.to_frame(ObjectFrameEnum.START,self.current_vehicle_state.pose,self.start_pose_abs) - # To be checked with Lucas - # self.prev_agents = self.current_agents.copy() - self.current_agents.clear() - # Converting to start frame obj_centers = vehicle.pose.apply(self.current_agent_obj_dims["pose"]) obj_dims = vehicle.pose.apply(self.current_agent_obj_dims["dims"]) - # Creating agent state and trackinig ids - self.find_vels_and_ids(obj_centers, obj_dims) - + # Prepare variables for find_vels_and_ids + self.prev_agents = self.current_agents.deepcopy() + self.current_agents.clear() + # Note this below function will probably throw a type error. I think we'll need to index the + # tuples by index 0 in the lists but I'm not sure: + agents = self.create_agent_states(obj_centers=obj_centers, obj_dims=obj_dims) + + # Calculating the velocity of agents and tracking their ids + self.find_vels_and_ids(agents=agents) + + # Convert to current vehicle frame here for planning group. + # Create a new dictionary here which is a deepcopy of self.current_agents + # convert this new dictionary to current vehicle frame + # return this new dictionary + # OR just make planning group do the conversion. Depends on what data we want to log? + return self.current_agents # TODO: Improve Algo Knn, ransac, etc. @@ -236,8 +244,6 @@ def viz_object_states(self, cv_image, boxes, extracted_pts_all): def update_object_states(self, track_result: List[Results], extracted_pts_all: List[np.ndarray]) -> None: - - self.current_agent_obj_dims.clear() # Return if no track results available if track_result[0].boxes.id == None: @@ -278,38 +284,25 @@ def update_object_states(self, track_result: List[Results], extracted_pts_all: L # TODO: Refactor to make more efficient # TODO: Moving Average across last N iterations pos/vel? Less spurious vals - # TODO Fix velocity calculation to calculate in ObjectFrameEnum.START - def find_vels_and_ids(self, obj_centers: List[np.ndarray], obj_dims: List[np.ndarray]): - - # Gate to check whether dims and centers are empty (will happen if no pedestrians are scanned): - for idx in range(len(obj_dims) -1, -1, -1): - if (obj_centers[idx].size == 0) or (obj_dims[0].size == 0): - del obj_centers[idx] - del obj_dims[idx] - + def find_vels_and_ids(self, agents: List[AgentState]): + # self.prev_agents was assigned a deepcopy of self.current_agents and then self.current_agents + # was cleared before this function was called + # Nothing was scanned, erase current (for current output) and previous list (for next time through because nothing was scanned) - if (len(obj_dims) == 0 or len(obj_centers) == 0): + if (len(agents) == 0): self.current_agents = {} - self.prev_agents = {} return - - new_prev_agents = {} # Stores current agents in START frame for next time through (since - # planning wants us to send them agents in CURRENT frame) assigned = False - num_pairings = len(obj_centers) + num_agents = len(agents) - # Create a list of agent states in the start frame - agents_sf = self.create_agent_states_in_start_frame(obj_centers=obj_centers, obj_dims=obj_dims) # Create agents in start frame - - # Loop through the indexes of the obj_center and obj_dim pairings - for idx in range(num_pairings): + for idx in range(num_agents): assigned = False # Loop through previous agents backwards for prev_id, prev_state in reversed(self.prev_agents.items()): - # If an obj_center and obj_dim pair overlaps with a previous agent, assume that they're the same agent - if self.agents_overlap(agents_sf[idx], prev_state): + # If a scanned agent overlaps with a previous agent, assume that they're the same agent + if self.agents_overlap(agents[idx], prev_state): assigned = True if self.prev_time == None: @@ -317,62 +310,30 @@ def find_vels_and_ids(self, obj_centers: List[np.ndarray], obj_dims: List[np.nda vel = np.array([0, 0, 0]) else: delta_t = self.curr_time - self.prev_time - vel = (np.array([agents_sf[idx].pose.x, agents_sf[idx].pose.y, agents_sf[idx].pose.z]) - np.array([prev_state.pose.x, prev_state.pose.y, prev_state.pose.z])) / delta_t.total_seconds() + vel = (np.array([agents[idx].pose.x, agents[idx].pose.y, agents[idx].pose.z]) - np.array([prev_state.pose.x, prev_state.pose.y, prev_state.pose.z])) / delta_t.total_seconds() print("VELOCITY:") print(vel) - - # Create current frame agent at planning group's request: - self.current_agents[prev_id] = ( - AgentState( - track_id = prev_id, - pose=ObjectPose(t=(self.curr_time-self.t_start).total_seconds(), x=obj_centers[idx][0], y=obj_centers[idx][1], z=obj_centers[idx][2], yaw=0,pitch=0,roll=0,frame=ObjectFrameEnum.CURRENT), - # (l, w, h) - # TODO: confirm (z -> l, x -> w, y -> h) - dimensions=(obj_dims[idx][0], obj_dims[idx][1], obj_dims[idx][2]), - outline=None, - type=AgentEnum.PEDESTRIAN, - activity=AgentActivityEnum.STOPPED if np.all(vel == 0) else AgentActivityEnum.MOVING, - velocity=(0, 0, 0) if vel.size == 0 else tuple(vel), - yaw_rate=0 - )) # Fix start frame agent and store in this dict: - agents_sf[idx].track_id = prev_id - agents_sf[idx].activity = AgentActivityEnum.STOPPED if (np.all(vel == 0) or vel.size == 0) else AgentActivityEnum.MOVING - agents_sf[idx].velocity = (0, 0, 0) if vel.size == 0 else tuple(vel) + agents[idx].track_id = prev_id + agents[idx].activity = AgentActivityEnum.STOPPED if (np.all(vel == 0) or vel.size == 0) else AgentActivityEnum.MOVING + agents[idx].velocity = (0, 0, 0) if vel.size == 0 else tuple(vel) - new_prev_agents[prev_id] = agents_sf[idx] - del self.prev_agents[prev_id] # Remove previous agent from previous agents + self.current_agents[prev_id] = agents[idx] + del self.prev_agents[prev_id] # Remove previous agent from previous agents so it doesn't get assigned multiple times on accident break - - # If not assigned: + if not assigned: # Set velocity to 0 and assign the new agent a new id with IdTracker id = self.id_tracker.get_new_id() - # Create current frame agent at planning group's request: - self.current_agents[id] = ( - AgentState( - track_id = id, - pose=ObjectPose(t=(self.curr_time-self.t_start).total_seconds(), x=obj_centers[idx][0], y=obj_centers[idx][1], z=obj_centers[idx][2] ,yaw=0,pitch=0,roll=0,frame=ObjectFrameEnum.CURRENT), - # (l, w, h) - # TODO: confirm (z -> l, x -> w, y -> h) - dimensions=(obj_dims[idx][0], obj_dims[idx][1], obj_dims[idx][2]), - outline=None, - type=AgentEnum.PEDESTRIAN, - activity=AgentActivityEnum.UNDETERMINED, - velocity=(0, 0, 0), - yaw_rate=0 - )) - # Fix start frame agent and store in this dict: - agents_sf[idx].track_id = id - agents_sf[idx].activity = AgentActivityEnum.UNDETERMINED - agents_sf[idx].velocity = (0, 0, 0) - new_prev_agents[id] = agents_sf[idx] - self.prev_agents = new_prev_agents + agents[idx].track_id = id + agents[idx].activity = AgentActivityEnum.UNDETERMINED + agents[idx].velocity = (0, 0, 0) + self.current_agents[id] = agents[idx] - # Calculates whether 2 agents overlap in START frame. True if they do, false if not + # Calculates whether 2 agents overlap. True if they do, false if not def agents_overlap(self, curr_agent: AgentState, prev_agent: AgentState) -> bool: # Calculate corners of AgentState # Beware: AgentState(PhysicalObject) builds bbox from @@ -396,13 +357,19 @@ def agents_overlap(self, curr_agent: AgentState, prev_agent: AgentState) -> bool ( (z1_min <= z2_min and z2_min <= z1_max) or (z2_min <= z1_min and z1_min <= z2_max) ) ) - def create_agent_states_in_start_frame(self, obj_centers: List[np.ndarray], obj_dims: List[np.ndarray]) -> List[AgentState]: + def create_agent_states(self, obj_centers: List[np.ndarray], obj_dims: List[np.ndarray]) -> List[AgentState]: + # Gate to check whether dims and centers are empty (will happen if no pedestrians are scanned): + for idx in range(len(obj_dims) -1, -1, -1): + if (obj_centers[idx].size == 0) or (obj_dims[0].size == 0): + del obj_centers[idx] + del obj_dims[idx] + # Create list of agent states in current vehicle frame: agents = [] num_pairings = len(obj_centers) for idx in range(num_pairings): # Create agent in current frame: - state = AgentState( + agents.append(AgentState( track_id=0, # Temporary pose=ObjectPose(t=(self.curr_time - self.t_start).total_seconds(), x=obj_centers[idx][0], y=obj_centers[idx][1], z=obj_centers[idx][2], yaw=0,pitch=0,roll=0,frame=ObjectFrameEnum.CURRENT), # Beware: AgentState(PhysicalObject) builds bbox from @@ -416,12 +383,8 @@ def create_agent_states_in_start_frame(self, obj_centers: List[np.ndarray], obj_ activity=AgentActivityEnum.UNDETERMINED, # Temporary velocity=None, # Temporary yaw_rate=0 - ) - - # Convert agent to start frame and add to agents list: - agents.append(state) + )) - # Return the agent states converted to start frame: return agents