Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file added Part01/Animation_Output/AnimationVideo.mp4
Binary file not shown.
Binary file added Part01/Animation_Output/TerminalOutput.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added Part01/__pycache__/a_star.cpython-38.pyc
Binary file not shown.
Binary file added Part01/__pycache__/a_star.cpython-39.pyc
Binary file not shown.
Binary file added Part01/__pycache__/action_handler.cpython-38.pyc
Binary file not shown.
Binary file added Part01/__pycache__/action_handler.cpython-39.pyc
Binary file not shown.
Binary file added Part01/__pycache__/environment.cpython-38.pyc
Binary file not shown.
Binary file added Part01/__pycache__/environment.cpython-39.pyc
Binary file not shown.
Binary file added Part01/__pycache__/node.cpython-38.pyc
Binary file not shown.
Binary file added Part01/__pycache__/node.cpython-39.pyc
Binary file not shown.
Binary file added Part01/__pycache__/node_manager.cpython-38.pyc
Binary file not shown.
Binary file added Part01/__pycache__/node_manager.cpython-39.pyc
Binary file not shown.
Binary file added Part01/__pycache__/obstacle_model.cpython-38.pyc
Binary file not shown.
Binary file added Part01/__pycache__/obstacle_model.cpython-39.pyc
Binary file not shown.
187 changes: 86 additions & 101 deletions a_star_arshad_dhinesh_tej.py → Part01/a_star.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,21 +44,21 @@ def visited_node_check(self, node, list_x, list_y, list_theta, robot_radius):
theta_threshold_min = node[2] - 10 #theta threshold min = node's theta -30
theta_threshold_max = node[2] + 10 #theta threshold max = node's theta +30

if _x.size > 0:
thetas = list_theta[_x.ravel()]
# if _x.size > 0:
# thetas = list_theta[_x.ravel()]

range_thetas = np.where(np.logical_and(thetas>=theta_threshold_min, thetas<=theta_threshold_max))
# range_thetas = np.where(np.logical_and(thetas>=theta_threshold_min, thetas<=theta_threshold_max))

_len = len(range_thetas[0])
if _len > 0:
return True, _x[range_thetas[0][0]]
# _len = len(range_thetas[0])
# if _len > 0:
# return True, _x[range_thetas[0][0]]

if len(_x) > 0: #if there are elements in dist_matrix that are less than 0.5
for i in _x: #loop through all elements in dist_matrix that are less than 0.5
if list_theta[i] > theta_threshold_min and list_theta[i] < theta_threshold_max: #if theta of node in list is within theta threshold
return True, i #Then node is in the given list
# return True, _x[0][0]
return False, None #Else node is not in the given list
# for i in _x: #loop through all elements in dist_matrix that are less than 0.5
# if list_theta[i] > theta_threshold_min and list_theta[i] < theta_threshold_max: #if theta of node in list is within theta threshold
# return True, i #Then node is in the given list
return True, _x[0]
# return False, None #Else node is not in the given list
else:
return False, None #Else node is not in the given list

Expand Down Expand Up @@ -95,7 +95,6 @@ def explore_actions(self, parent_node) -> bool:
if Status:

simulated_position , Total_Cost_To_Come, Total_Cost_To_Go = Sim_Pose

#else of the state has already been explored then ignore the action
status, idx = self.visited_node_check(simulated_position, node_manager.node_x[self.visited_node_list], \
node_manager.node_y[self.visited_node_list], node_manager.node_theta[self.visited_node_list], self.robot_radius)
Expand All @@ -110,6 +109,7 @@ def explore_actions(self, parent_node) -> bool:
NewNode = node_manager.make_node(simulated_position, Total_Cost_To_Come, Total_Cost_To_Go)
#Update the parent for the node
NewNode.Parent_Node_hash = parent_node.Node_hash
NewNode.Action_to_Node = action
heapq.heappush(self.pending_state_que, NewNode)
self.node_arrow_list[parent_node.Node_hash].append(NewNode.Node_hash)

Expand All @@ -128,6 +128,7 @@ def explore_actions(self, parent_node) -> bool:
node_manager.global_node_directory[idx].Node_State = simulated_position

NewNode = node_manager.global_node_directory[idx]
NewNode.Action_to_Node = action
heapq.heapify(self.pending_state_que)

#Push new node to the pending que for future expoloration
Expand All @@ -136,7 +137,7 @@ def explore_actions(self, parent_node) -> bool:
# heapq.heappush(self.pending_state_que, NewNode)


def find_goal_node(self, start_state, goal_state, robot_radius, goal_dist_threshold) -> bool:
def find_goal_node(self, start_state, goal_state, robot_radius, goal_dist_threshold, display_env) -> bool:
"""Takes start stat and goal state for the environement
and computes the tree using _Breadth first search algorithm till the goal state is reached

Expand All @@ -154,6 +155,7 @@ def find_goal_node(self, start_state, goal_state, robot_radius, goal_dist_thresh
self.initial_state = start_state
self.goal_dist_threshold = goal_dist_threshold
self.Final_Node = None
self.rendered_nodes = 0

#Initialize search que. This stores the nodes that needs to be explored
start_node = node_manager.make_node(self.initial_state)
Expand All @@ -165,7 +167,15 @@ def find_goal_node(self, start_state, goal_state, robot_radius, goal_dist_thresh

#Perform search till a goal state is reached or maximum number of iterations reached

render_freq = 100
render_freq_counter = 0
while True:
render_freq_counter += 1
if render_freq_counter % render_freq == 0:
self.visualize_exploration(start_state, goal_state, display_env, self.robot_radius, self.rendered_nodes)
self.rendered_nodes = len(self.visited_node_list)


if len(self.pending_state_que) > 0:
#fetch next node to be explored
next_item = heapq.heappop(self.pending_state_que)
Expand All @@ -186,6 +196,40 @@ def find_goal_node(self, start_state, goal_state, robot_radius, goal_dist_thresh
else:
print("Unable to find the goal state!!!")
return False

def back_track_actions(self):
"""Funciton to find the final trajectory given the final goal node

Returns:
list of tuples: list of trajectory points
"""
if self.Final_Node != None:
ul = []
ur = []
theta = []

last_node = self.Final_Node
ul.append(last_node.Action_to_Node[0])
ur.append(last_node.Action_to_Node[1])
theta.append(last_node.Node_State[2])

#back track untill the start node has been reached
while True:
last_node = node_manager.global_node_directory[last_node.Parent_Node_hash]

if last_node.Parent_Node_hash == None:
break

if last_node.Action_to_Node != None:
ul.append(last_node.Action_to_Node[0])
ur.append(last_node.Action_to_Node[1])
theta.append(node_manager.global_node_directory[last_node.Parent_Node_hash].Node_State[2])


ul.reverse(); ur.reverse(); theta.reverse()
return ul, ur, theta

return None

def back_track(self):
"""Funciton to find the final trajectory given the final goal node
Expand All @@ -200,17 +244,17 @@ def back_track(self):
#back track untill the start node has been reached
while True:
last_node = node_manager.global_node_directory[last_node.Parent_Node_hash]
if last_node.Parent_Node_hash != None:
trajectory.append(last_node.Node_State)
else:
trajectory.append(last_node.Node_State)

if last_node.Parent_Node_hash == None:
break

trajectory.reverse()
return trajectory

return None

def visualize_exploration(self, display_environment, robot_radius):
def visualize_exploration(self, start_state, goal_state, display_environment, robot_radius, render_node_start_index = 0):
"""Visualize exploration of the nodes
"""
#indicate start and goal nodes with circles5
Expand All @@ -222,10 +266,13 @@ def visualize_exploration(self, display_environment, robot_radius):
#variable to control gui update rate
update_count = 0
# Loop through all visited nodes and show on the cv window
for start in self.node_arrow_list:
for end in self.node_arrow_list[start]:
display_environment.update_action(node_manager.global_node_directory[start].Node_State, \
node_manager.global_node_directory[end].Node_State)
for idx in self.visited_node_list[render_node_start_index:]:
if node_manager.global_node_directory[idx].Parent_Node_hash != None:
parent_idx = node_manager.global_node_directory[idx].Parent_Node_hash
pose = (node_manager.node_x[parent_idx], node_manager.node_y[parent_idx], \
node_manager.node_theta[parent_idx])
display_environment.update_action(pose, \
node_manager.global_node_directory[idx].Action_to_Node, self.env.is_valid_position)

update_count +=1
#Update gui every 300 iteration
Expand All @@ -244,6 +291,24 @@ def visualize_trajectory(self, trajectory, display_environment, robot_radius):
Args:
trajectory (lsit of tuples): trajectory
"""
update_count = 0
# Loop through all visited nodes and show on the cv window
for node in self.pending_state_que:

if node.Parent_Node_hash != None:
parent_idx = node.Parent_Node_hash
pose = (node_manager.node_x[parent_idx], node_manager.node_y[parent_idx], \
node_manager.node_theta[parent_idx])
display_environment.update_action(pose, \
node.Action_to_Node, self.env.is_valid_position)

update_count +=1
#Update gui every 300 iteration
if update_count == 500:
update_count = 0
cv.waitKey(1)


#Loop through all points in the trajectory
for point in trajectory:
display_environment.highlight_point(point)
Expand Down Expand Up @@ -273,85 +338,5 @@ def animate_trajectory(self, trajectory, display_environment, robot_radius):

cv.waitKey(1)

if __name__ == "__main__":


print("Enter obstacle inflation size : ")
obstacle_inflation = int(input())
print("Enter robot radius : ")
robot_radius = int(input())
goal_dist_threshold = 1.5 * robot_radius

print("Note: Enter step greater than 0.5 * robot radius")
print(f"Enter robot's action step size: [{ceil(0.5 * robot_radius)} - 10]")
step_size = int(input())

while step_size < 1 or step_size > 10: #check if the start node is outside the map
print("\nAction step size out of range [1-10]. Re-Enter the action Step size: ")
step_size = int(input())

#Create environment
print("Please wait while creating the environment.")
#Crate environment for showing the final visualization--------------------------
#This does not show the additional inflation for robot radius. Insted, it will inflate the robot size
display_environment = environment(250, 600, obstacle_inflation)
display_environment.create_map()
#-------------------------------------------------------------------------------

_environment = environment(250, 600, robot_radius+obstacle_inflation)
_environment.create_map()
print("Environment created successfully.")

# #Getting start node from the user:
print("\nEnter Start Node as integers: x,y,theta")
start_y, start_x, start_theta = input().replace(" ", "").split(',')
start_state = (int(start_x.strip()), int(start_y.strip()), int(start_theta.strip()))


while(not _environment.is_valid_position(start_state)) or start_state[2] % 30 != 0: #check if the start node is outside the map
print("\nStart Node is out of the Map or theta is not a multiple of 30. Re-Enter the Start node: ")
start_y, start_x, start_theta = input().replace(" ", "").split(',')
start_state = (int(start_x.strip()), int(start_y.strip()), int(start_theta.strip()))


# Getting goal node from the user:
print("Enter Goal Node as integers: x,y,theta")
goal_y, goal_x, goal_theta = input().replace(" ", "").split(',')
goal_state = (int(goal_x.strip()), int(goal_y.strip()), int(goal_theta.strip()))

while(not _environment.is_valid_position(goal_state)) or goal_state[2] % 30 != 0: #check if the goal node is outside the map
print("\nGoal Node is out of the Map or theta is not a multiple of 30 Re-Enter the Goal node: ")
goal_y, goal_x,goal_theta = input().replace(" ", "").split(',')
goal_state = (int(goal_x.strip()), int(goal_y.strip()), int(goal_theta.strip()))



#Create action handler for the environment
_actionHandler = action_handler(_environment, step_size, start_state, goal_state)

#create planner
_astar_planner = astar_planner(_environment, _actionHandler)

#Request planner to find a plan
print("Planning started.")
print(f"Start Position : {[start_state[1], start_state[0]]}, Goal Postion : {[goal_state[1], goal_state[0]]}")
print("Please wait while searching for the goal state...")
start_time = time.time()
_status = _astar_planner.find_goal_node(start_state, goal_state, robot_radius, goal_dist_threshold)
elspsed_time = time.time() - start_time

print(f"Total time taken to run the algorithm : {elspsed_time} seconds\n")

#If Planner is successfull, visualize the plan
if _status:
trajectory = _astar_planner.back_track()
if trajectory != None:
# display_environment.begin_video_writer()
_astar_planner.visualize_exploration(display_environment, robot_radius)
_astar_planner.visualize_trajectory(trajectory, display_environment, robot_radius)
_astar_planner.animate_trajectory(trajectory, display_environment, robot_radius)
# display_environment.close_video_writer()

display_environment.save_image("Solution.png")
cv.waitKey(0)

92 changes: 92 additions & 0 deletions Part01/action_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
import math
import time
import numpy as np

class action_handler:
#Class to handle the action supported by the environment


def __init__(self, env, step_size, start_pose, goal_pose, omega1, omega2, wheel_radius, wheel_base_width) -> None:
"""Initilaize action handler parameters
Args:
env (environment): referance ot environment
"""
self.env = env
self.start_pose = start_pose
self.goal_pose = goal_pose
self.step_size = step_size
self.robot_wheel_radius = wheel_radius
self.wheel_base_width = wheel_base_width

#define al the actions possible in the environment
self.Actions = [(0, omega1), (omega1,0), (omega1, omega1), (0, omega2), (omega2, 0), (omega2,omega2), (omega1, omega2), (omega2, omega1)]
print(f"ACTION SET : {self.Actions}")


def cost_norm(self, pose1, pose2):
x1, y1, _ = pose1
x2, y2, _ = pose2
return np.linalg.norm([x1 - x2, y1 - y2])

def ActionValues(self, pose, action):
Xi,Yi,Thetai = pose
UL, UR = action

t = 0
dt = 0.1

Xn=Xi
Yn=Yi
Thetan = 3.14 * Thetai / 180


cost_to_go=0
while t<self.step_size:
t = t + dt

Delta_Xn = 0.5*self.robot_wheel_radius * (UL + UR) * math.cos(Thetan) * dt
Delta_Yn = 0.5*self.robot_wheel_radius * (UL + UR) * math.sin(Thetan) * dt

Thetan += (self.robot_wheel_radius / self.wheel_base_width) * (UR - UL) * dt

cost_to_go=cost_to_go+ math.sqrt(math.pow((0.5*self.robot_wheel_radius * (UL + UR) * math.cos(Thetan) *
dt),2)+math.pow((0.5*self.robot_wheel_radius * (UL + UR) * math.sin(Thetan) * dt),2))

Xn += Delta_Xn
Yn += Delta_Yn

if not self.env.is_valid_position((round(Xn), round(Yn), Thetan)):
return None, None

Thetan = 180 * (Thetan) / 3.14
return (round(Xn), round(Yn), Thetan%360), cost_to_go

def PerformAction(self, parent_node, action):
"""Compute the next state and estimate the total cost for the next state

Args:
parent_node (node): node of the
action (string): action label

Returns:
tuple: validity of the action, action cost
"""

agents_pose = parent_node.Node_State

#Simulate agents next position
simulated_position, action_cost = self.ActionValues(agents_pose, action)

if simulated_position is not None:

#Compute the total cost of the action
action_cost_to_come = parent_node.Cost_to_Come + action_cost

action_cost_to_go = self.cost_norm(simulated_position, self.goal_pose)#+abs(agents_pose[2] - simulated_position[2])

#return the estimated position and cost
return True, (simulated_position, action_cost_to_come, action_cost_to_go)

else:
return False, None

Loading