Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
144 changes: 144 additions & 0 deletions GEMstack/onboard/perception/AgentTracker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
import math
from typing import Dict, List

from GEMstack.onboard.perception.IdTracker import IdTracker
from GEMstack.onboard.perception.PrevAgent import PrevAgent
from GEMstack.state.agent import AgentState


class AgentTracker():
"""Associates and tracks AgentState agents.
"""
def __init__(self):
# List of PrevAgent objects (each keeps track of the last seen state and time since seen)
self.prev_agents: List[PrevAgent] = []
# List of currently visible AgentState objects
self.current_agents: List[AgentState] = []
# Maximum time (in seconds) to keep a lost agent before dropping it.
self.drop_agent_t: float = 1.0
# Id tracker for creating new unique pedestrian IDs.
self.id_tracker = IdTracker()

def assign_ids(self, agents: list) -> Dict[str,AgentState]:
# Act with the assumption that you are being sent a list of AgentState objects and you need to use the object fields to keep track of them for your task
# Further act on the assumption that we will decide the id's of the pedestrians by assuming that 2 pedestrians are the same pedestrian if a
# previously stored AgentState pose and dimensions overlap with a newly passed in AgentState
# Act on the assumption that the AgentState objects are all in reference to the start frame of the vehicle
# some helper functions in this class, LostAgent.py, and IdTracker.py have been created to try to help you out with your task.
# Assume that the output returned from this function will be a dictionary of AgentState objects with the key corresponding to their id
"""
Associates new AgentState objects with existing tracked agents based on overlap.
If an agent does not match any previously tracked agent, a new unique id is assigned.
Also updates the “lost” time for agents that are not matched in this frame.

Parameters:
agents (list): List of AgentState objects for the current frame
(already converted to the start frame of reference).

Returns:
Dict[str, AgentState]: Dictionary mapping agent id (as a string) to AgentState.
"""
dt = 1.0 # Assuming a fixed time-step of 1 second (for simplicity)
output_agents: Dict[str, AgentState] = {}
matched_ids = set()
updated_prev_agents: List[PrevAgent] = []

# Process each new agent from the current frame.
for new_agent in agents:
found_match = None
# Look for a previously tracked agent whose bounding box overlaps.
for prev in self.prev_agents:
if prev.last_id in matched_ids:
continue # Already matched with another new agent.
if self.__agents_overlap(prev.last_state, new_agent):
found_match = prev
break

if found_match is not None:
# update velocity using the change in position.
if hasattr(new_agent, 'velocity'):
new_agent.velocity = self.__calculate_velocity(found_match.last_state, new_agent, dt)

# Update the matched agent’s state and reset its lost-time counter.
found_match.last_state = new_agent
found_match.time_since_seen = 0.0
matched_ids.add(found_match.last_id)
output_agents[str(found_match.last_id)] = new_agent
updated_prev_agents.append(found_match)
else:
# No match found – assign a new unique id.
new_id = self.id_tracker.get_new_ped_id()
new_prev = PrevAgent(new_id, new_agent)
# initialize velocity to 0.
if hasattr(new_agent, 'velocity'):
new_agent.velocity = 0.0
output_agents[str(new_id)] = new_agent
updated_prev_agents.append(new_prev)

# For all previously tracked agents that were not matched this frame,
# update their time-since-seen and only keep them if they have not timed out.
for prev in self.prev_agents:
if prev.last_id not in matched_ids:
prev.update_time(dt)
if prev.time_since_seen < self.drop_agent_t:
updated_prev_agents.append(prev)

# Save the updated list of tracked agents.
self.prev_agents = updated_prev_agents
# update current_agents to include only those seen in the current frame.
self.current_agents = list(output_agents.values())
return output_agents

def __convert_to_start_frame(self):
"""Converts a list of AgentState agents from ouster Lidar frame of
reference (which is in reference to the current frame) to start
frame frame of reference
"""
# you can ignore this function akul
pass

def __agents_overlap(self, ped1, ped2) -> bool:
"""
Determines if two AgentState objects overlap based on their pose and dimensions.

Assumes each AgentState has:
- pose with attributes x and y.
- dimensions with attributes width and height.

Returns:
bool: True if they overlap; False otherwise.
"""
# Get first agent's properties.
x1, y1 = ped1.pose.x, ped1.pose.y
w1, h1 = ped1.dimensions.width, ped1.dimensions.height

# Get second agent's properties.
x2, y2 = ped2.pose.x, ped2.pose.y
w2, h2 = ped2.dimensions.width, ped2.dimensions.height

# Compute bounding boxes (assuming (x, y) is the center).
left1, right1 = x1 - w1 / 2, x1 + w1 / 2
top1, bottom1 = y1 - h1 / 2, y1 + h1 / 2

left2, right2 = x2 - w2 / 2, x2 + w2 / 2
top2, bottom2 = y2 - h2 / 2, y2 + h2 / 2

# Check for overlap between the bounding boxes.
overlap = not (right1 < left2 or left1 > right2 or bottom1 < top2 or top1 > bottom2)
return overlap

def __calculate_velocity(self, old_state: AgentState, new_state: AgentState, dt: float) -> float:
"""
Calculates the velocity based on the change in pose over time.

Parameters:
old_state (AgentState): The previous state.
new_state (AgentState): The current state.
dt (float): Time difference between the two states.

Returns:
float: The computed velocity.
"""
dx = new_state.pose.x - old_state.pose.x
dy = new_state.pose.y - old_state.pose.y
return math.sqrt(dx * dx + dy * dy) / dt if dt > 0 else 0.0
12 changes: 12 additions & 0 deletions GEMstack/onboard/perception/IdTracker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
class IdTracker():
"""Abstracts out id tracking
"""
def __init__(self):
self.__ped_id = 0

def get_new_ped_id(self) -> int:
"""Returns a unique pedestrian id
"""
assigned_id = self.__id
self.__ped_id += 1 # id will intentionally overflow to get back to 0
return assigned_id
17 changes: 17 additions & 0 deletions GEMstack/onboard/perception/PrevAgent.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# GEM imports:
from ...state import AgentState

class PrevAgent():
def __init__(self, last_id: int, last_state: AgentState):
self.time_since_seen: float = 0.0 # Time since the agent was last seen in seconds
self.last_id: int = last_id
self.last_state: AgentState = last_state

def update_time(time: float):
"""Updates the time since the agent was last seen
"""
if time <= 0:
# TODO: log error here
print("UPDATE TIME FOR LOST AGENT WAS LESS THAN OR EQUAL TO 0")
else:
self.time_since_seen += time
89 changes: 11 additions & 78 deletions GEMstack/onboard/perception/fusion.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
# YOLO + GEM
from ultralytics import YOLO
from fusion_utils import *
from ...state import AgentState


class Fusion3D():
def __init__(self):
Expand All @@ -19,95 +21,27 @@ def __init__(self):
self.current_dets_bboxes = []
self.prev_agents = [] # dict{id: agent} is more efficient, but list for
self.current_agents = [] # simplicity to match update() output to start
self.visualization = True
self.confidence = 0.7
self.classes_to_detect = 0
self.ground_threshold = 1.6
self.max_dist_percent = 0.7

# Setup visualization variables
self.visualization = True # Set this to true for visualization, later change to get value from sys arg

# Load calibration data
self.R = load_extrinsics(os.getcwd() + '/GEMstack/onboard/perception/calibration/extrinsics/R.npy')
self.t = load_extrinsics(os.getcwd() + '/GEMstack/onboard/perception/calibration/extrinsics/t.npy')
self.K = load_intrinsics(os.getcwd() + '/GEMstack/onboard/perception/calibration/camera_intrinsics.json')

# Subscribers and sychronizers
self.rgb_rosbag = message_filters.Subscriber('/oak/rgb/image_raw', Image)
self.top_lidar_rosbag = message_filters.Subscriber('/ouster/points', PointCloud2)
self.sync = message_filters.ApproximateTimeSynchronizer([self.rgb_rosbag, self.top_lidar_rosbag], queue_size=10, slop=0.1)
self.sync.registerCallback(self.fusion_callback)

# TF listener to get transformation from LiDAR to Camera
self.tf_listener = tf.TransformListener()
# self.tf_listener = tf.TransformListener()S

# Publishers
self.pub_pedestrians_pc2 = rospy.Publisher("/point_cloud/pedestrians", PointCloud2, queue_size=10)
if(self.visualization):
self.pub_image = rospy.Publisher("/camera/image_detection", Image, queue_size=1)

# TODO: refactor into pedestrian_detection.py
def update(self, vehicle : VehicleState) -> Dict[str,AgentState]:
self.prev_agents = self.current_agents
return self.prev_agents

# clusters: list[ np.array(shape = (N, XYZ) ]
# TODO: Improve Algo Knn, ransac, etc.
def find_centers(clusters: list[np.ndarray]):
centers = [np.mean(clust, axis=0) for clust in clusters]
return centers

# Beware: AgentState(PhysicalObject) builds bbox from
# dims [-l/2,l/2] x [-w/2,w/2] x [0,h], not
# [-l/2,l/2] x [-w/2,w/2] x [-h/2,h/2]
def find_dims(clusters):
# Add some small constant to height to compensate for
# objects distance to ground we filtered from lidar, etc.?
dims = [np.max(clust, axis= 0) - np.min(clust, axis= 0) for clust in clusters]
return dims

# track_result: self.detector.track (YOLO) output
# flattened_pedestrians_3d_pts: Camera frame points, ind
# corresponds to object
# TODO: Finish adapt this func + fusion.py to multiple classes
def update_object_states(track_result, flattened_pedestrians_3d_pts: list[np.ndarray]):
track_ids = track_result[0].boxes.id.int().cpu().tolist()
num_objs = len(track_ids)

if len(flattened_pedestrians_3d_pts) != num_objs:
raise Exception('Perception - Camera detections, points num. mismatch')

# Combine funcs for efficiency in C
# Separate numpy prob still faster
obj_centers = find_centers(flattened_pedestrians_3d_pts)
obj_dims = find_dims(flattened_pedestrians_3d_pts)

# Calculate new velocities
# TODO: Improve velocity algo + remove O(n) dict
# Moving Average across last N iterations pos/vel?
track_id_center_map = dict(zip(track_ids, obj_centers))
# Object not seen -> velocity = None
vels = defaultdict(None)
for prev_agent in self.prev_agents:
if prev_agent.track_id in track_ids:
vels[track_id] = track_id_center_map[track_id] - np.array([prev_agent.pose.x, prev_agent.pose.y, prev_agent.pose.z])

# Update Current AgentStates
self.current_agents = [
AgentState(
track_id = track_ids[ind]
pose=ObjectPose(t=0,obj_centers[0],obj_centers[1],obj_centers[2],yaw=0,pitch=0,roll=0,frame=ObjectFrameEnum.CURRENT),
# (l, w, h)
# TODO: confirm (z -> l, x -> w, y -> h)
dimensions=tuple(obj_dims[ind][2], obj_dims[ind][0], obj_dims[ind][1]),
outline=None,
type=AgentEnum.PEDESTRIAN,
activity=AgentActivityEnum.MOVING,
velocity=tuple(vels[track_ids[ind]]),
yaw_rate=0
)
for ind in len(num_objs)
]


def fusion_callback(self, rgb_image_msg: Image, lidar_pc2_msg: PointCloud2):
# Convert to cv2 image and run detector
cv_image = self.bridge.imgmsg_to_cv2(rgb_image_msg, "bgr8")
Expand Down Expand Up @@ -195,10 +129,9 @@ def fusion_callback(self, rgb_image_msg: Image, lidar_pc2_msg: PointCloud2):
self.pub_image.publish(ros_img)



if __name__ == '__main__':
rospy.init_node('fusion_node', anonymous=True)
Fusion3D()
while not rospy.core.is_shutdown():
rospy.rostime.wallsleep(0.5)
# if __name__ == '__main__':
# rospy.init_node('fusion_node', anonymous=True)
# Fusion3D()
# while not rospy.core.is_shutdown():
# rospy.rostime.wallsleep(0.5)

Loading
Loading