Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 46 additions & 83 deletions GEMstack/onboard/perception/pedestrian_detection.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,23 +140,31 @@ def init_debug(self,) -> None:
self.pub_image = rospy.Publisher("/camera/image_detection", Image, queue_size=1)

def update(self, vehicle : VehicleState) -> Dict[str,AgentState]:

# Convert to start state.
# self.current_vehicle_state | data from GNSS of vehicle pose in GLOBAL Frame
# self.start_pose_abs | data from GNSS of the first vehicle pose in GLOBAL Frame
vehicle.pose.to_frame(ObjectFrameEnum.START,self.current_vehicle_state.pose,self.start_pose_abs)

# To be checked with Lucas
# self.prev_agents = self.current_agents.copy()
self.current_agents.clear()

# Converting to start frame
obj_centers = vehicle.pose.apply(self.current_agent_obj_dims["pose"])
obj_dims = vehicle.pose.apply(self.current_agent_obj_dims["dims"])

# Creating agent state and trackinig ids
self.find_vels_and_ids(obj_centers, obj_dims)

# Prepare variables for find_vels_and_ids
self.prev_agents = self.current_agents.deepcopy()
self.current_agents.clear()
# Note this below function will probably throw a type error. I think we'll need to index the
# tuples by index 0 in the lists but I'm not sure:
agents = self.create_agent_states(obj_centers=obj_centers, obj_dims=obj_dims)

# Calculating the velocity of agents and tracking their ids
self.find_vels_and_ids(agents=agents)

# Convert to current vehicle frame here for planning group.
# Create a new dictionary here which is a deepcopy of self.current_agents
# convert this new dictionary to current vehicle frame
# return this new dictionary
# OR just make planning group do the conversion. Depends on what data we want to log?

return self.current_agents

# TODO: Improve Algo Knn, ransac, etc.
Expand Down Expand Up @@ -236,8 +244,6 @@ def viz_object_states(self, cv_image, boxes, extracted_pts_all):


def update_object_states(self, track_result: List[Results], extracted_pts_all: List[np.ndarray]) -> None:


self.current_agent_obj_dims.clear()
# Return if no track results available
if track_result[0].boxes.id == None:
Expand Down Expand Up @@ -278,101 +284,56 @@ def update_object_states(self, track_result: List[Results], extracted_pts_all: L

# TODO: Refactor to make more efficient
# TODO: Moving Average across last N iterations pos/vel? Less spurious vals
# TODO Fix velocity calculation to calculate in ObjectFrameEnum.START
def find_vels_and_ids(self, obj_centers: List[np.ndarray], obj_dims: List[np.ndarray]):

# Gate to check whether dims and centers are empty (will happen if no pedestrians are scanned):
for idx in range(len(obj_dims) -1, -1, -1):
if (obj_centers[idx].size == 0) or (obj_dims[0].size == 0):
del obj_centers[idx]
del obj_dims[idx]

def find_vels_and_ids(self, agents: List[AgentState]):
# self.prev_agents was assigned a deepcopy of self.current_agents and then self.current_agents
# was cleared before this function was called

# Nothing was scanned, erase current (for current output) and previous list (for next time through because nothing was scanned)
if (len(obj_dims) == 0 or len(obj_centers) == 0):
if (len(agents) == 0):
self.current_agents = {}
self.prev_agents = {}
return

new_prev_agents = {} # Stores current agents in START frame for next time through (since
# planning wants us to send them agents in CURRENT frame)

assigned = False
num_pairings = len(obj_centers)
num_agents = len(agents)

# Create a list of agent states in the start frame
agents_sf = self.create_agent_states_in_start_frame(obj_centers=obj_centers, obj_dims=obj_dims) # Create agents in start frame

# Loop through the indexes of the obj_center and obj_dim pairings
for idx in range(num_pairings):
for idx in range(num_agents):
assigned = False

# Loop through previous agents backwards
for prev_id, prev_state in reversed(self.prev_agents.items()):
# If an obj_center and obj_dim pair overlaps with a previous agent, assume that they're the same agent
if self.agents_overlap(agents_sf[idx], prev_state):
# If a scanned agent overlaps with a previous agent, assume that they're the same agent
if self.agents_overlap(agents[idx], prev_state):
assigned = True

if self.prev_time == None:
# This will be triggered if the very first message has pedestrians in it
vel = np.array([0, 0, 0])
else:
delta_t = self.curr_time - self.prev_time
vel = (np.array([agents_sf[idx].pose.x, agents_sf[idx].pose.y, agents_sf[idx].pose.z]) - np.array([prev_state.pose.x, prev_state.pose.y, prev_state.pose.z])) / delta_t.total_seconds()
vel = (np.array([agents[idx].pose.x, agents[idx].pose.y, agents[idx].pose.z]) - np.array([prev_state.pose.x, prev_state.pose.y, prev_state.pose.z])) / delta_t.total_seconds()
print("VELOCITY:")
print(vel)

# Create current frame agent at planning group's request:
self.current_agents[prev_id] = (
AgentState(
track_id = prev_id,
pose=ObjectPose(t=(self.curr_time-self.t_start).total_seconds(), x=obj_centers[idx][0], y=obj_centers[idx][1], z=obj_centers[idx][2], yaw=0,pitch=0,roll=0,frame=ObjectFrameEnum.CURRENT),
# (l, w, h)
# TODO: confirm (z -> l, x -> w, y -> h)
dimensions=(obj_dims[idx][0], obj_dims[idx][1], obj_dims[idx][2]),
outline=None,
type=AgentEnum.PEDESTRIAN,
activity=AgentActivityEnum.STOPPED if np.all(vel == 0) else AgentActivityEnum.MOVING,
velocity=(0, 0, 0) if vel.size == 0 else tuple(vel),
yaw_rate=0
))

# Fix start frame agent and store in this dict:
agents_sf[idx].track_id = prev_id
agents_sf[idx].activity = AgentActivityEnum.STOPPED if (np.all(vel == 0) or vel.size == 0) else AgentActivityEnum.MOVING
agents_sf[idx].velocity = (0, 0, 0) if vel.size == 0 else tuple(vel)
agents[idx].track_id = prev_id
agents[idx].activity = AgentActivityEnum.STOPPED if (np.all(vel == 0) or vel.size == 0) else AgentActivityEnum.MOVING
agents[idx].velocity = (0, 0, 0) if vel.size == 0 else tuple(vel)

new_prev_agents[prev_id] = agents_sf[idx]
del self.prev_agents[prev_id] # Remove previous agent from previous agents
self.current_agents[prev_id] = agents[idx]
del self.prev_agents[prev_id] # Remove previous agent from previous agents so it doesn't get assigned multiple times on accident
break

# If not assigned:

if not assigned:
# Set velocity to 0 and assign the new agent a new id with IdTracker
id = self.id_tracker.get_new_id()

# Create current frame agent at planning group's request:
self.current_agents[id] = (
AgentState(
track_id = id,
pose=ObjectPose(t=(self.curr_time-self.t_start).total_seconds(), x=obj_centers[idx][0], y=obj_centers[idx][1], z=obj_centers[idx][2] ,yaw=0,pitch=0,roll=0,frame=ObjectFrameEnum.CURRENT),
# (l, w, h)
# TODO: confirm (z -> l, x -> w, y -> h)
dimensions=(obj_dims[idx][0], obj_dims[idx][1], obj_dims[idx][2]),
outline=None,
type=AgentEnum.PEDESTRIAN,
activity=AgentActivityEnum.UNDETERMINED,
velocity=(0, 0, 0),
yaw_rate=0
))

# Fix start frame agent and store in this dict:
agents_sf[idx].track_id = id
agents_sf[idx].activity = AgentActivityEnum.UNDETERMINED
agents_sf[idx].velocity = (0, 0, 0)
new_prev_agents[id] = agents_sf[idx]
self.prev_agents = new_prev_agents
agents[idx].track_id = id
agents[idx].activity = AgentActivityEnum.UNDETERMINED
agents[idx].velocity = (0, 0, 0)
self.current_agents[id] = agents[idx]

# Calculates whether 2 agents overlap in START frame. True if they do, false if not
# Calculates whether 2 agents overlap. True if they do, false if not
def agents_overlap(self, curr_agent: AgentState, prev_agent: AgentState) -> bool:
# Calculate corners of AgentState
# Beware: AgentState(PhysicalObject) builds bbox from
Expand All @@ -396,13 +357,19 @@ def agents_overlap(self, curr_agent: AgentState, prev_agent: AgentState) -> bool
( (z1_min <= z2_min and z2_min <= z1_max) or (z2_min <= z1_min and z1_min <= z2_max) )
)

def create_agent_states_in_start_frame(self, obj_centers: List[np.ndarray], obj_dims: List[np.ndarray]) -> List[AgentState]:
def create_agent_states(self, obj_centers: List[np.ndarray], obj_dims: List[np.ndarray]) -> List[AgentState]:
# Gate to check whether dims and centers are empty (will happen if no pedestrians are scanned):
for idx in range(len(obj_dims) -1, -1, -1):
if (obj_centers[idx].size == 0) or (obj_dims[0].size == 0):
del obj_centers[idx]
del obj_dims[idx]

# Create list of agent states in current vehicle frame:
agents = []
num_pairings = len(obj_centers)
for idx in range(num_pairings):
# Create agent in current frame:
state = AgentState(
agents.append(AgentState(
track_id=0, # Temporary
pose=ObjectPose(t=(self.curr_time - self.t_start).total_seconds(), x=obj_centers[idx][0], y=obj_centers[idx][1], z=obj_centers[idx][2], yaw=0,pitch=0,roll=0,frame=ObjectFrameEnum.CURRENT),
# Beware: AgentState(PhysicalObject) builds bbox from
Expand All @@ -416,12 +383,8 @@ def create_agent_states_in_start_frame(self, obj_centers: List[np.ndarray], obj_
activity=AgentActivityEnum.UNDETERMINED, # Temporary
velocity=None, # Temporary
yaw_rate=0
)

# Convert agent to start frame and add to agents list:
agents.append(state)
))

# Return the agent states converted to start frame:
return agents


Expand Down